http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1b3e111..fbd9f51 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -18,6 +18,12 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -37,6 +43,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; @@ -67,16 +74,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; -import com.google.common.annotations.VisibleForTesting; - -// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. -// Internally, we use shaded protobuf. This below are part of our public API. -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.ServiceException; -import com.google.protobuf.Service; -// SEE ABOVE NOTE! - /** * An implementation of {@link Table}. Used to communicate with a single HBase table. * Lightweight. Get as needed and just close when done. @@ -414,16 +411,23 @@ public class HTable implements Table { if (get.getConsistency() == Consistency.STRONG) { // Good old call. - final Get configuredGet = get; + final Get getReq = get; RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - this.rpcControllerFactory, getName(), get.getRow()) { + getName(), get.getRow()) { @Override - protected Result call(PayloadCarryingRpcController controller) throws Exception { - ClientProtos.GetRequest request = RequestConverter.buildGetRequest( - getLocation().getRegionInfo().getRegionName(), configuredGet); - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + public Result call(int callTimeout) throws IOException { + ClientProtos.GetRequest request = + RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) return null; + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } }; return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable, @@ -439,6 +443,7 @@ public class HTable implements Table { return callable.call(operationTimeout); } + /** * {@inheritDoc} */ @@ -449,14 +454,16 @@ public class HTable implements Table { } try { Object[] r1 = new Object[gets.size()]; - batch((List<? extends Row>)gets, r1); - // Translate. + batch((List) gets, r1); + + // translate. Result [] results = new Result[r1.length]; - int i = 0; - for (Object obj: r1) { - // Batch ensures if there is a failure we get an exception instead - results[i++] = (Result)obj; + int i=0; + for (Object o : r1) { + // batch ensures if there is a failure we get an exception instead + results[i++] = (Result) o; } + return results; } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException().initCause(e); @@ -504,13 +511,21 @@ public class HTable implements Table { public void delete(final Delete delete) throws IOException { RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection, - this.rpcControllerFactory, getName(), delete.getRow()) { + tableName, delete.getRow()) { @Override - protected Boolean call(PayloadCarryingRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } }; rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, @@ -566,28 +581,41 @@ public class HTable implements Table { */ @Override public void mutateRow(final RowMutations rm) throws IOException { + final RetryingTimeTracker tracker = new RetryingTimeTracker(); PayloadCarryingServerCallable<MultiResponse> callable = - new PayloadCarryingServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), + new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), rpcControllerFactory) { - @Override - protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception { - RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( - getLocation().getRegionInfo().getRegionName(), rm); - regionMutationBuilder.setAtomic(true); - MultiRequest request = - MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - ClientProtos.MultiResponse response = getStub().multi(controller, request); - ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); - if (res.hasException()) { - Throwable ex = ProtobufUtil.toException(res.getException()); - if (ex instanceof IOException) { - throw (IOException) ex; + @Override + public MultiResponse call(int callTimeout) throws IOException { + tracker.start(); + controller.setPriority(tableName); + int remainingTime = tracker.getRemainingTime(callTimeout); + if (remainingTime == 0) { + throw new DoNotRetryIOException("Timeout for mutate row"); + } + controller.setCallTimeout(remainingTime); + try { + RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( + getLocation().getRegionInfo().getRegionName(), rm); + regionMutationBuilder.setAtomic(true); + MultiRequest request = + MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + ClientProtos.MultiResponse response = getStub().multi(controller, request); + ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); + if (res.hasException()) { + Throwable ex = ProtobufUtil.toException(res.getException()); + if (ex instanceof IOException) { + throw (IOException) ex; + } + throw new IOException("Failed to mutate row: " + + Bytes.toStringBinary(rm.getRow()), ex); + } + return ResponseConverter.getResults(request, response, controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex); } - return ResponseConverter.getResults(request, response, controller.cellScanner()); - } - }; + }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), null, null, callable, operationTimeout); ars.waitUntilDone(); @@ -596,31 +624,38 @@ public class HTable implements Table { } } - private static void checkHasFamilies(final Mutation mutation) throws IOException { - if (mutation.numFamilies() == 0) { - throw new IOException("Invalid arguments to " + mutation + ", zero columns specified"); - } - } - /** * {@inheritDoc} */ @Override public Result append(final Append append) throws IOException { - checkHasFamilies(append); - RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - this.rpcControllerFactory, getName(), append.getRow()) { - @Override - protected Result call(PayloadCarryingRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNewNonce()); - MutateResponse response = getStub().mutate(controller, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } - }; - return rpcCallerFactory.<Result> newCaller(this.rpcTimeout). - callWithRetries(callable, this.operationTimeout); + if (append.numFamilies() == 0) { + throw new IOException( + "Invalid arguments to append, no columns specified"); + } + + NonceGenerator ng = this.connection.getNonceGenerator(); + final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); + RegionServerCallable<Result> callable = + new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) { + @Override + public Result call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); + MutateResponse response = getStub().mutate(controller, request); + if (!response.hasResult()) return null; + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -628,16 +663,27 @@ public class HTable implements Table { */ @Override public Result increment(final Increment increment) throws IOException { - checkHasFamilies(increment); + if (!increment.hasFamilies()) { + throw new IOException( + "Invalid arguments to increment, no columns specified"); + } + NonceGenerator ng = this.connection.getNonceGenerator(); + final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - this.rpcControllerFactory, getName(), increment.getRow()) { + getName(), increment.getRow()) { @Override - protected Result call(PayloadCarryingRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNewNonce()); - MutateResponse response = getStub().mutate(controller, request); - // Should this check for null like append does? - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + public Result call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce); + MutateResponse response = getStub().mutate(controller, request); + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } }; return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, @@ -676,20 +722,28 @@ public class HTable implements Table { NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); - RegionServerCallable<Long> callable = new RegionServerCallable<Long>(this.connection, - this.rpcControllerFactory, getName(), row) { - @Override - protected Long call(PayloadCarryingRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildIncrementRequest( - getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); - } - }; - return rpcCallerFactory.<Long> newCaller(rpcTimeout). - callWithRetries(callable, this.operationTimeout); + RegionServerCallable<Long> callable = + new RegionServerCallable<Long>(connection, getName(), row) { + @Override + public Long call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = RequestConverter.buildIncrementRequest( + getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, amount, durability, nonceGroup, nonce); + MutateResponse response = getStub().mutate(controller, request); + Result result = + ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -700,19 +754,26 @@ public class HTable implements Table { final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { - RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, - this.rpcControllerFactory, getName(), row) { - @Override - protected Boolean call(PayloadCarryingRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, put); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } - }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout). - callWithRetries(callable, this.operationTimeout); + RegionServerCallable<Boolean> callable = + new RegionServerCallable<Boolean>(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), CompareType.EQUAL, put); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -723,42 +784,57 @@ public class HTable implements Table { final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException { - RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, - this.rpcControllerFactory, getName(), row) { - @Override - protected Boolean call(PayloadCarryingRpcController controller) throws Exception { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, put); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } - }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout). - callWithRetries(callable, this.operationTimeout); + RegionServerCallable<Boolean> callable = + new RegionServerCallable<Boolean>(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, put); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** * {@inheritDoc} */ @Override - public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final byte [] value, final Delete delete) + public boolean checkAndDelete(final byte [] row, + final byte [] family, final byte [] qualifier, final byte [] value, + final Delete delete) throws IOException { - RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, - this.rpcControllerFactory, getName(), row) { - @Override - protected Boolean call(PayloadCarryingRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } - }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout). - callWithRetries(callable, this.operationTimeout); + RegionServerCallable<Boolean> callable = + new RegionServerCallable<Boolean>(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), CompareType.EQUAL, delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -769,18 +845,25 @@ public class HTable implements Table { final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException { - RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, - this.rpcControllerFactory, getName(), row) { - @Override - protected Boolean call(PayloadCarryingRpcController controller) throws Exception { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } - }; + RegionServerCallable<Boolean> callable = + new RegionServerCallable<Boolean>(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -792,28 +875,40 @@ public class HTable implements Table { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { + final RetryingTimeTracker tracker = new RetryingTimeTracker(); PayloadCarryingServerCallable<MultiResponse> callable = new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), rpcControllerFactory) { @Override - protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MultiRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, rm); - ClientProtos.MultiResponse response = getStub().multi(controller, request); - ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); - if (res.hasException()) { - Throwable ex = ProtobufUtil.toException(res.getException()); - if (ex instanceof IOException) { - throw (IOException)ex; + public MultiResponse call(int callTimeout) throws IOException { + tracker.start(); + controller.setPriority(tableName); + int remainingTime = tracker.getRemainingTime(callTimeout); + if (remainingTime == 0) { + throw new DoNotRetryIOException("Timeout for mutate row"); + } + controller.setCallTimeout(remainingTime); + try { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MultiRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, rm); + ClientProtos.MultiResponse response = getStub().multi(controller, request); + ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); + if (res.hasException()) { + Throwable ex = ProtobufUtil.toException(res.getException()); + if(ex instanceof IOException) { + throw (IOException)ex; + } + throw new IOException("Failed to checkAndMutate row: "+ + Bytes.toStringBinary(rm.getRow()), ex); } - throw new IOException("Failed to checkAndMutate row: "+ Bytes.toStringBinary(rm.getRow()), ex); + return ResponseConverter.getResults(request, response, controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - return ResponseConverter.getResults(request, response, controller.cellScanner()); } }; - /** * Currently, we use one array to store 'processed' flag which is returned by server. * It is excessive to send such a large array, but that is required by the framework right now @@ -873,6 +968,7 @@ public class HTable implements Table { } /** + * {@inheritDoc} * @throws IOException */ void flushCommits() throws IOException { @@ -1049,18 +1145,19 @@ public class HTable implements Table { for (final byte[] r : keys) { final RegionCoprocessorRpcChannel channel = new RegionCoprocessorRpcChannel(connection, tableName, r); - Future<R> future = pool.submit(new Callable<R>() { - @Override - public R call() throws Exception { - T instance = ProtobufUtil.newServiceStub(service, channel); - R result = callable.call(instance); - byte[] region = channel.getLastRegion(); - if (callback != null) { - callback.update(region, r, result); - } - return result; - } - }); + Future<R> future = pool.submit( + new Callable<R>() { + @Override + public R call() throws Exception { + T instance = ProtobufUtil.newServiceStub(service, channel); + R result = callable.call(instance); + byte[] region = channel.getLastRegion(); + if (callback != null) { + callback.update(region, r, result); + } + return result; + } + }); futures.put(r, future); } for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { @@ -1113,6 +1210,9 @@ public class HTable implements Table { return tableName + ";" + connection; } + /** + * {@inheritDoc} + */ @Override public <R extends Message> Map<byte[], R> batchCoprocessorService( Descriptors.MethodDescriptor methodDescriptor, Message request, @@ -1121,13 +1221,14 @@ public class HTable implements Table { Bytes.BYTES_COMPARATOR)); batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, new Callback<R>() { - @Override - public void update(byte[] region, byte[] row, R result) { - if (region != null) { - results.put(region, result); - } - } - }); + + @Override + public void update(byte[] region, byte[] row, R result) { + if (region != null) { + results.put(region, result); + } + } + }); return results; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java index ae62255..66d3c21 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java @@ -21,24 +21,16 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; - /** * A RetryingCallable for master operations. * @param <V> return type */ -// Like RegionServerCallable abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable { protected ClusterConnection connection; protected MasterKeepAliveConnection master; - private final PayloadCarryingRpcController rpcController; - MasterCallable(final Connection connection, - final RpcControllerFactory rpcConnectionFactory) { + public MasterCallable(final Connection connection) { this.connection = (ClusterConnection) connection; - this.rpcController = rpcConnectionFactory.newController(); } @Override @@ -67,31 +59,4 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable { public long sleep(long pause, int tries) { return ConnectionUtils.getPauseTime(pause, tries); } - - /** - * Override that changes Exception from {@link Exception} to {@link IOException}. It also does - * setup of an rpcController and calls through to the unimplemented - * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. - */ - @Override - // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate - // and so we contain references to protobuf. We can't set priority on the rpcController as - // we do in RegionServerCallable because we don't always have a Table when we call. - public V call(int callTimeout) throws IOException { - try { - this.rpcController.setCallTimeout(callTimeout); - return call(this.rpcController); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } - - /** - * Run RPC call. - * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a - * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this - * class. - * @throws Exception - */ - protected abstract V call(PayloadCarryingRpcController rpcController) throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java index 47693f4..e445b78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java @@ -33,7 +33,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; * against the master on the MasterProtos.MasterService.BlockingInterface; but not by * final user code. Hence it's package protected. */ -interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface { +interface MasterKeepAliveConnection +extends MasterProtos.MasterService.BlockingInterface { // Do this instead of implement Closeable because closeable returning IOE is PITA. void close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index a3162f4..e764ceb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -30,9 +30,8 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -42,14 +41,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ServiceException; /** * Callable that handles the <code>multi</code> method call going against a single - * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a - * RegionServerCallable that goes against multiple regions). + * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a + * {@link RegionServerCallable} that goes against multiple regions. * @param <R> */ -@InterfaceAudience.Private class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> { private final MultiAction<R> multiAction; private final boolean cellBlock; @@ -80,7 +79,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse } @Override - protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception { + public MultiResponse call(int callTimeout) throws IOException { int countOfActions = this.multiAction.size(); if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions"); MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); @@ -99,8 +98,10 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse regionActionBuilder.clear(); regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName)); + + if (this.cellBlock) { - // Pre-size. Presume at least a KV per Action. There are likely more. + // Presize. Presume at least a KV per Action. There are likely more. if (cells == null) cells = new ArrayList<CellScannable>(countOfActions); // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations. // They have already been handled above. Guess at count of cells @@ -115,18 +116,18 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. - PayloadCarryingRpcController payloadCarryingRpcController = null; - if (cells != null) { - // Cast. Will fail if we have been passed wrong RpcController type. - payloadCarryingRpcController = (PayloadCarryingRpcController)controller; - payloadCarryingRpcController.setCellScanner(CellUtil.createCellScanner(cells)); - } + if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells)); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); ClientProtos.MultiResponse responseProto; ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); - responseProto = getStub().multi(controller, requestProto); + try { + responseProto = getStub().multi(controller, requestProto); + } catch (ServiceException e) { + throw ProtobufUtil.getRemoteException(e); + } if (responseProto == null) return null; // Occurs on cancel - return ResponseConverter.getResults(requestProto, responseProto, - payloadCarryingRpcController == null? null: payloadCarryingRpcController.cellScanner()); + return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); } /** @@ -150,4 +151,4 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse ServerName getServerName() { return location.getServerName(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java index 83d857b..d94f069 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java @@ -16,51 +16,33 @@ */ package org.apache.hadoop.hbase.client; -import java.io.IOException; - -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; /** - * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use - * AsyncProcess directly though this class. Also adds global timeout tracking on top of - * RegionServerCallable and implements Cancellable. + * This class is used to unify HTable calls with AsyncProcess Framework. + * HTable can use AsyncProcess directly though this class. */ @InterfaceAudience.Private -abstract class PayloadCarryingServerCallable<T> extends RegionServerCallable<T> - implements Cancellable { - private final RetryingTimeTracker tracker = new RetryingTimeTracker(); - - PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, - RpcControllerFactory rpcControllerFactory) { - super(connection, rpcControllerFactory, tableName, row); - } +public abstract class PayloadCarryingServerCallable<T> + extends RegionServerCallable<T> implements Cancellable { + protected PayloadCarryingRpcController controller; - /* Override so can mess with the callTimeout. - * (non-Javadoc) - * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int) - */ - @Override - public T call(int callTimeout) throws IOException { - // It is expected (it seems) that tracker.start can be called multiple times (on each trip - // through the call when retrying). Also, we can call start and no need of a stop. - this.tracker.start(); - int remainingTime = tracker.getRemainingTime(callTimeout); - if (remainingTime == 0) { - throw new DoNotRetryIOException("Timeout for mutate row"); - } - return super.call(remainingTime); + public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, + RpcControllerFactory rpcControllerFactory) { + super(connection, tableName, row); + this.controller = rpcControllerFactory.newController(); } @Override public void cancel() { - getRpcController().startCancel(); + controller.startCancel(); } @Override public boolean isCancelled() { - return getRpcController().isCanceled(); + return controller.isCanceled(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java index 4e347dd..54c93a0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java @@ -27,30 +27,31 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.util.Bytes; /** - * Similar to RegionServerCallable but for the AdminService interface. This service callable + * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable * assumes a Table and row and thus does region locating similar to RegionServerCallable. - * Works against Admin stub rather than Client stub. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", justification="stub used by ipc") @InterfaceAudience.Private public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> { - protected AdminService.BlockingInterface stub; - protected final RpcControllerFactory rpcControllerFactory; - private PayloadCarryingRpcController controller = null; protected final ClusterConnection connection; + + protected final RpcControllerFactory rpcControllerFactory; + + protected AdminService.BlockingInterface stub; + protected HRegionLocation location; + protected final TableName tableName; protected final byte[] row; protected final int replicaId; + protected final static int MIN_WAIT_DEAD_SERVER = 10000; public RegionAdminServiceCallable(ClusterConnection connection, @@ -81,13 +82,16 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable< if (Thread.interrupted()) { throw new InterruptedIOException(); } + if (reload || location == null) { location = getLocation(!reload); } + if (location == null) { // With this exception, there will be a retry. throw new HBaseIOException(getExceptionMessage()); } + this.setStub(connection.getAdmin(location.getServerName())); } @@ -163,39 +167,7 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable< if (rl == null) { throw new RetriesExhaustedException("Can't get the locations"); } - return rl; - } - - /** - * Override that changes Exception from {@link Exception} to {@link IOException}. It also does - * setup of an rpcController and calls through to the unimplemented - * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. - */ - @Override - // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate - // and so we contain references to protobuf. We can't set priority on the rpcController as - // we do in RegionServerCallable because we don't always have a Table when we call. - public T call(int callTimeout) throws IOException { - this.controller = rpcControllerFactory.newController(); - this.controller.setPriority(this.tableName); - this.controller.setCallTimeout(callTimeout); - try { - return call(this.controller); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } - PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() { - return this.controller; + return rl; } - - /** - * Run RPC call. - * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a - * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this - * class. - * @throws Exception - */ - protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 861b375..d878bae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -1,4 +1,5 @@ /** + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,20 +24,12 @@ import java.io.IOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; /** - * Implementations make an rpc call against a RegionService via a protobuf Service. - * Implement #rpcCall(RpcController) and then call {@link #call(int)} to - * trigger the rpc. The {@link #call(int)} eventually invokes your - * #rpcCall(RpcController) meanwhile saving you having to write a bunch of - * boilerplate. The {@link #call(int)} implementation is from {@link RpcRetryingCaller} so rpcs are - * retried on fail. - * - * <p>TODO: this class is actually tied to one region, because most of the paths make use of + * Implementations call a RegionServer and implement {@link #call(int)}. + * Passed to a {@link RpcRetryingCaller} so we retry on fail. + * TODO: this class is actually tied to one region, because most of the paths make use of * the regioninfo part of location when building requests. The only reason it works for * multi-region requests (e.g. batch) is that they happen to not use the region parts. * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, @@ -44,27 +37,18 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; * @param <T> the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> { +public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements + RetryingCallable<T> { + private ClientService.BlockingInterface stub; - private final PayloadCarryingRpcController rpcController; /** * @param connection Connection to use. * @param tableName Table name to which <code>row</code> belongs. * @param row The row we want in <code>tableName</code>. */ - public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, - TableName tableName, byte [] row) { - this(connection, rpcControllerFactory.newController(), tableName, row); - } - - public RegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController, - TableName tableName, byte [] row) { + public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { super(connection, tableName, row); - this.rpcController = rpcController; - if (this.rpcController != null) { - this.rpcController.setPriority(tableName); - } } void setClientByServiceName(ServerName service) throws IOException { @@ -85,42 +69,4 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab void setStub(final ClientService.BlockingInterface stub) { this.stub = stub; } - - /** - * Override that changes Exception from {@link Exception} to {@link IOException}. It also does - * setup of an rpcController and calls through to the unimplemented - * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. - */ - @Override - public T call(int callTimeout) throws IOException { - if (this.rpcController != null) { - this.rpcController.setCallTimeout(callTimeout); - } - try { - return call(this.rpcController); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } - - /** - * Run RPC call. - * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a - * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this - * class. - * @throws Exception - */ - protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; - - public PayloadCarryingRpcController getRpcController() { - return this.rpcController; - } - - long getNonceGroup() { - return getConnection().getNonceGenerator().getNonceGroup(); - } - - long getNewNonce() { - return getConnection().getNonceGenerator().newNonce(); - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java index b9438e6..24288e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * Tracks the amount of time remaining for an operation. */ class RetryingTimeTracker { + private long globalStartTime = -1; public void start() { @@ -37,19 +38,16 @@ class RetryingTimeTracker { if (callTimeout == Integer.MAX_VALUE) { return Integer.MAX_VALUE; } - long remaining = EnvironmentEdgeManager.currentTime() - this.globalStartTime; - long remainingTime = callTimeout - remaining; + int remainingTime = (int) ( + callTimeout - + (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); if (remainingTime < 1) { // If there is no time left, we're trying anyway. It's too late. // 0 means no timeout, and it's not the intent here. So we secure both cases by // resetting to the minimum. remainingTime = 1; } - if (remainingTime > Integer.MAX_VALUE) { - throw new RuntimeException("remainingTime=" + remainingTime + - " which is > Integer.MAX_VALUE"); - } - return (int)remainingTime; + return remainingTime; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index 644337d..0c2d345 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -176,9 +176,9 @@ public class ReversedScannerCallable extends ScannerCallable { @Override public ScannerCallable getScannerCallableForReplica(int id) { - ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName, + ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName, this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id); r.setCaching(this.getCaching()); return r; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java deleted file mode 100644 index 68a4aa2..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; - -/** - * A RetryingCallable for RPC connection operations. - * @param <V> return type - */ -abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable { - @Override - public void prepare(boolean reload) throws IOException { - } - - @Override - public void close() throws IOException { - } - - @Override - public void throwable(Throwable t, boolean retrying) { - } - - @Override - public String getExceptionMessageAdditionalDetail() { - return ""; - } - - @Override - public long sleep(long pause, int tries) { - return ConnectionUtils.getPauseTime(pause, tries); - } - - @Override - // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate - // and so we contain references to protobuf. - public V call(int callTimeout) throws IOException { - try { - return rpcCall(callTimeout); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } - - protected abstract V rpcCall(int callTimeout) throws Exception; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 2b2e4c8..b4cd2ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -22,6 +22,9 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import java.io.IOException; +/** + * + */ @InterfaceAudience.Public @InterfaceStability.Evolving public interface RpcRetryingCaller<T> { @@ -49,4 +52,4 @@ public interface RpcRetryingCaller<T> { */ T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index f92aeae..1c723c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -36,7 +36,6 @@ public class RpcRetryingCallerFactory { private final int rpcTimeout; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; - /* These below data members are UNUSED!!!*/ private final boolean enableBackPressure; private ServerStatisticTracker stats; http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 2785648..65dbb10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -29,6 +29,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import com.google.protobuf.ServiceException; + /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -53,6 +57,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; */ @InterfaceAudience.Private public class RpcRetryingCallerWithReadReplicas { + private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class); + protected final ExecutorService pool; protected final ClusterConnection cConnection; protected final Configuration conf; @@ -92,7 +98,7 @@ public class RpcRetryingCallerWithReadReplicas { private final PayloadCarryingRpcController controller; public ReplicaRegionServerCallable(int id, HRegionLocation location) { - super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory, + super(RpcRetryingCallerWithReadReplicas.this.cConnection, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); this.id = id; this.location = location; @@ -135,20 +141,28 @@ public class RpcRetryingCallerWithReadReplicas { } @Override - protected Result call(PayloadCarryingRpcController controller) throws Exception { + public Result call(int callTimeout) throws Exception { if (controller.isCanceled()) return null; + if (Thread.interrupted()) { throw new InterruptedIOException(); } + byte[] reg = location.getRegionInfo().getRegionName(); + ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); controller.setCallTimeout(callTimeout); - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) { - return null; + + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) { + return null; + } + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 1689d11..72d69ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -52,6 +52,9 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; + /** * Scanner operations such as create, next, etc. * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as @@ -71,6 +74,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected boolean renew = false; private Scan scan; private int caching = 1; + protected final ClusterConnection cConnection; protected ScanMetrics scanMetrics; private boolean logScannerActivity = false; private int logCutOffLatency = 1000; @@ -121,8 +125,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { */ public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { - super(connection, rpcControllerFactory, tableName, scan.getStartRow()); + super(connection, tableName, scan.getStartRow()); this.id = id; + this.cConnection = connection; this.scan = scan; this.scanMetrics = scanMetrics; Configuration conf = connection.getConfiguration(); @@ -180,16 +185,25 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { } } - protected Result [] call(PayloadCarryingRpcController controller) throws Exception { + + @Override + public Result [] call(int callTimeout) throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException(); } - if (this.closed) { - if (this.scannerId != -1) { + + if (controller == null) { + controller = controllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + } + + if (closed) { + if (scannerId != -1) { close(); } } else { - if (this.scannerId == -1L) { + if (scannerId == -1L) { this.scannerId = openScanner(); } else { Result [] rrs = null; @@ -198,56 +212,61 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { setHeartbeatMessage(false); try { incRPCcallsMetrics(); - request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, + request = + RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew); ScanResponse response = null; - response = getStub().scan(controller, request); - // Client and RS maintain a nextCallSeq number during the scan. Every next() call - // from client to server will increment this number in both sides. Client passes this - // number along with the request and at RS side both the incoming nextCallSeq and its - // nextCallSeq will be matched. In case of a timeout this increment at the client side - // should not happen. If at the server side fetching of next batch of data was over, - // there will be mismatch in the nextCallSeq number. Server will throw - // OutOfOrderScannerNextException and then client will reopen the scanner with startrow - // as the last successfully retrieved row. - // See HBASE-5974 - nextCallSeq++; - long timestamp = System.currentTimeMillis(); - setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); - // Results are returned via controller - CellScanner cellScanner = controller.cellScanner(); - rrs = ResponseConverter.getResults(cellScanner, response); - if (logScannerActivity) { - long now = System.currentTimeMillis(); - if (now - timestamp > logCutOffLatency) { - int rows = rrs == null ? 0 : rrs.length; - LOG.info("Took " + (now-timestamp) + "ms to fetch " + try { + response = getStub().scan(controller, request); + // Client and RS maintain a nextCallSeq number during the scan. Every next() call + // from client to server will increment this number in both sides. Client passes this + // number along with the request and at RS side both the incoming nextCallSeq and its + // nextCallSeq will be matched. In case of a timeout this increment at the client side + // should not happen. If at the server side fetching of next batch of data was over, + // there will be mismatch in the nextCallSeq number. Server will throw + // OutOfOrderScannerNextException and then client will reopen the scanner with startrow + // as the last successfully retrieved row. + // See HBASE-5974 + nextCallSeq++; + long timestamp = System.currentTimeMillis(); + setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); + // Results are returned via controller + CellScanner cellScanner = controller.cellScanner(); + rrs = ResponseConverter.getResults(cellScanner, response); + if (logScannerActivity) { + long now = System.currentTimeMillis(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info("Took " + (now-timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); + } } - } - updateServerSideMetrics(response); - // moreResults is only used for the case where a filter exhausts all elements - if (response.hasMoreResults() && !response.getMoreResults()) { - this.scannerId = -1L; - this.closed = true; - // Implied that no results were returned back, either. - return null; - } - // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due - // to size or quantity of results in the response. - if (response.hasMoreResultsInRegion()) { - // Set what the RS said - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - // Server didn't respond whether it has more results or not. - setHasMoreResultsContext(false); + updateServerSideMetrics(response); + // moreResults is only used for the case where a filter exhausts all elements + if (response.hasMoreResults() && !response.getMoreResults()) { + scannerId = -1L; + closed = true; + // Implied that no results were returned back, either. + return null; + } + // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due + // to size or quantity of results in the response. + if (response.hasMoreResultsInRegion()) { + // Set what the RS said + setHasMoreResultsContext(true); + setServerHasMoreResults(response.getMoreResultsInRegion()); + } else { + // Server didn't respond whether it has more results or not. + setHasMoreResultsContext(false); + } + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } updateResultsMetrics(rrs); } catch (IOException e) { if (logScannerActivity) { - LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " + - getLocation(), e); + LOG.info("Got exception making request " + TextFormat.shortDebugString(request) + + " to " + getLocation(), e); } IOException ioe = e; if (e instanceof RemoteException) { @@ -256,9 +275,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { if (logScannerActivity && (ioe instanceof UnknownScannerException)) { try { HRegionLocation location = - getConnection().relocateRegion(getTableName(), scan.getStartRow()); - LOG.info("Scanner=" + scannerId + " expired, current region location is " + - location.toString()); + getConnection().relocateRegion(getTableName(), scan.getStartRow()); + LOG.info("Scanner=" + scannerId + + " expired, current region location is " + location.toString()); } catch (Throwable t) { LOG.info("Failed to relocate region", t); } @@ -357,8 +376,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { getStub().scan(controller, request); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } } catch (IOException e) { LOG.warn("Ignore, probably already closed", e); @@ -368,8 +387,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected long openScanner() throws IOException { incRPCcallsMetrics(); - ScanRequest request = RequestConverter.buildScanRequest( - getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); + ScanRequest request = + RequestConverter.buildScanRequest( + getLocation().getRegionInfo().getRegionName(), + this.scan, 0, false); try { ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); @@ -378,8 +399,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { + " on region " + getLocation().toString()); } return id; - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } } @@ -422,6 +443,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { return caching; } + @Override + public ClusterConnection getConnection() { + return cConnection; + } + /** * Set the number of rows that will be fetched on next * @param caching the number of rows for caching @@ -462,4 +488,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) { this.serverHasMoreResultsContext = serverHasMoreResultsContext; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index d6896e1..7b1547d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -22,9 +22,6 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -41,35 +38,41 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.token.Token; +import com.google.protobuf.ServiceException; + /** * Client proxy for SecureBulkLoadProtocol */ @InterfaceAudience.Private public class SecureBulkLoadClient { private Table table; - private final RpcControllerFactory rpcControllerFactory; - public SecureBulkLoadClient(final Configuration conf, Table table) { + public SecureBulkLoadClient(Table table) { this.table = table; - this.rpcControllerFactory = new RpcControllerFactory(conf); } public String prepareBulkLoad(final Connection conn) throws IOException { try { - RegionServerCallable<String> callable = new RegionServerCallable<String>(conn, - this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) { - @Override - protected String call(PayloadCarryingRpcController controller) throws Exception { - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - RegionSpecifier region = - RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); - PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(table.getName())) - .setRegion(region).build(); - PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request); - return response.getBulkToken(); - } - }; + RegionServerCallable<String> callable = + new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) { + @Override + public String call(int callTimeout) throws IOException { + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + RegionSpecifier region = + RequestConverter + .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); + try { + PrepareBulkLoadRequest request = + PrepareBulkLoadRequest.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(table.getName())) + .setRegion(region).build(); + PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request); + return response.getBulkToken(); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { @@ -79,19 +82,24 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { try { - RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn, - this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) { - @Override - protected Void call(PayloadCarryingRpcController controller) throws Exception { - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - RegionSpecifier region = RequestConverter.buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - CleanupBulkLoadRequest request = - CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build(); - getStub().cleanupBulkLoad(null, request); - return null; - } - }; + RegionServerCallable<Void> callable = + new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) { + @Override + public Void call(int callTimeout) throws IOException { + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + RegionSpecifier region = RequestConverter.buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionName); + try { + CleanupBulkLoadRequest request = + CleanupBulkLoadRequest.newBuilder().setRegion(region) + .setBulkToken(bulkToken).build(); + getStub().cleanupBulkLoad(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + return null; + } + }; RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { @@ -122,12 +130,12 @@ public class SecureBulkLoadClient { try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); return response.getLoaded(); - } catch (Exception se) { - throw ProtobufUtil.handleRemoteException(se); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); } } public Path getStagingPath(String bulkToken, byte[] family) throws IOException { return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java index a6384e3..6fae5cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -77,4 +77,5 @@ public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { } return response; } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index 6c290a6..f4f18b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -17,39 +17,24 @@ */ package org.apache.hadoop.hbase.ipc; -import java.io.IOException; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; /** * Optionally carries Cells across the proxy/service interface down into ipc. On its - * way out it optionally carries a set of result Cell data. We stick the Cells here when we want - * to avoid having to protobuf them (for performance reasons). This class is used ferrying data - * across the proxy/protobuf service chasm. Also does call timeout. Used by client and server - * ipc'ing. + * way out it optionally carries a set of result Cell data. We stick the Cells here when we want + * to avoid having to protobuf them. This class is used ferrying data across the proxy/protobuf + * service chasm. Used by client and server ipc'ing. */ @InterfaceAudience.Private -public class PayloadCarryingRpcController implements RpcController, CellScannable { - /** - * The time, in ms before the call should expire. - */ - protected volatile Integer callTimeout; - protected volatile boolean cancelled = false; - protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null); - protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null); - private IOException exception; +public class PayloadCarryingRpcController + extends TimeLimitedRpcController implements CellScannable { public static final int PRIORITY_UNSET = -1; /** @@ -108,123 +93,15 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl } /** - * @param regionName RegionName. If hbase:meta, we'll set high priority. - */ - public void setPriority(final byte [] regionName) { - if (isMetaRegion(regionName)) { - setPriority(TableName.META_TABLE_NAME); - } - } - - private static boolean isMetaRegion(final byte[] regionName) { - return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) - || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); - } - - /** * @return The priority of this request */ public int getPriority() { return priority; } - @Override - public void reset() { + @Override public void reset() { + super.reset(); priority = 0; cellScanner = null; - exception = null; - cancelled = false; - failureCb.set(null); - cancellationCb.set(null); - callTimeout = null; - } - - public int getCallTimeout() { - if (callTimeout != null) { - return callTimeout; - } else { - return 0; - } - } - - public void setCallTimeout(int callTimeout) { - this.callTimeout = callTimeout; - } - - public boolean hasCallTimeout(){ - return callTimeout != null; - } - - @Override - public String errorText() { - if (exception != null) { - return exception.getMessage(); - } else { - return null; - } - } - - /** - * For use in async rpc clients - * @return true if failed - */ - @Override - public boolean failed() { - return this.exception != null; - } - - @Override - public boolean isCanceled() { - return cancelled; - } - - @Override - public void notifyOnCancel(RpcCallback<Object> cancellationCb) { - this.cancellationCb.set(cancellationCb); - if (this.cancelled) { - cancellationCb.run(null); - } - } - - /** - * Notify a callback on error. - * For use in async rpc clients - * - * @param failureCb the callback to call on error - */ - public void notifyOnFail(RpcCallback<IOException> failureCb) { - this.failureCb.set(failureCb); - if (this.exception != null) { - failureCb.run(this.exception); - } - } - - @Override - public void setFailed(String reason) { - this.exception = new IOException(reason); - if (this.failureCb.get() != null) { - this.failureCb.get().run(this.exception); - } - } - - /** - * Set failed with an exception to pass on. - * For use in async rpc clients - * - * @param e exception to set with - */ - public void setFailed(IOException e) { - this.exception = e; - if (this.failureCb.get() != null) { - this.failureCb.get().run(this.exception); - } - } - - @Override - public void startCancel() { - cancelled = true; - if (cancellationCb.get() != null) { - cancellationCb.get().run(null); - } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0206dc67/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index dbc9041..55d6375 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -76,23 +76,30 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { Descriptors.MethodDescriptor method, Message request, Message responsePrototype) throws IOException { if (LOG.isTraceEnabled()) { - LOG.trace("Call: " + method.getName() + ", " + request.toString()); + LOG.trace("Call: "+method.getName()+", "+request.toString()); } + if (row == null) { throw new IllegalArgumentException("Missing row property for remote region location"); } + + final RpcController rpcController = controller == null + ? rpcControllerFactory.newController() : controller; + final ClientProtos.CoprocessorServiceCall call = CoprocessorRpcUtils.buildServiceCall(row, method, request); RegionServerCallable<CoprocessorServiceResponse> callable = - new RegionServerCallable<CoprocessorServiceResponse>(connection, - controller == null? this.rpcControllerFactory.newController(): - (PayloadCarryingRpcController)controller, - table, row) { + new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) { @Override - protected CoprocessorServiceResponse call(PayloadCarryingRpcController controller) - throws Exception { + public CoprocessorServiceResponse call(int callTimeout) throws Exception { + if (rpcController instanceof PayloadCarryingRpcController) { + ((PayloadCarryingRpcController) rpcController).setPriority(tableName); + } + if (rpcController instanceof TimeLimitedRpcController) { + ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout); + } byte[] regionName = getLocation().getRegionInfo().getRegionName(); - return ProtobufUtil.execService(getRpcController(), getStub(), call, regionName); + return ProtobufUtil.execService(rpcController, getStub(), call, regionName); } }; CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()