http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fbd9f51..1b3e111 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -18,12 +18,6 @@ */ package org.apache.hadoop.hbase.client; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -43,7 +37,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; @@ -74,6 +67,16 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; +import com.google.common.annotations.VisibleForTesting; + +// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. +// Internally, we use shaded protobuf. This below are part of our public API. +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.ServiceException; +import com.google.protobuf.Service; +// SEE ABOVE NOTE! + /** * An implementation of {@link Table}. Used to communicate with a single HBase table. * Lightweight. Get as needed and just close when done. @@ -411,23 +414,16 @@ public class HTable implements Table { if (get.getConsistency() == Consistency.STRONG) { // Good old call. - final Get getReq = get; + final Get configuredGet = get; RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - getName(), get.getRow()) { + this.rpcControllerFactory, getName(), get.getRow()) { @Override - public Result call(int callTimeout) throws IOException { - ClientProtos.GetRequest request = - RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + protected Result call(PayloadCarryingRpcController controller) throws Exception { + ClientProtos.GetRequest request = RequestConverter.buildGetRequest( + getLocation().getRegionInfo().getRegionName(), configuredGet); + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) return null; + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); } }; return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable, @@ -443,7 +439,6 @@ public class HTable implements Table { return callable.call(operationTimeout); } - /** * {@inheritDoc} */ @@ -454,16 +449,14 @@ public class HTable implements Table { } try { Object[] r1 = new Object[gets.size()]; - batch((List) gets, r1); - - // translate. + batch((List<? extends Row>)gets, r1); + // Translate. Result [] results = new Result[r1.length]; - int i=0; - for (Object o : r1) { - // batch ensures if there is a failure we get an exception instead - results[i++] = (Result) o; + int i = 0; + for (Object obj: r1) { + // Batch ensures if there is a failure we get an exception instead + results[i++] = (Result)obj; } - return results; } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException().initCause(e); @@ -511,21 +504,13 @@ public class HTable implements Table { public void delete(final Delete delete) throws IOException { RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection, - tableName, delete.getRow()) { + this.rpcControllerFactory, getName(), delete.getRow()) { @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + protected Boolean call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); } }; rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, @@ -581,41 +566,28 @@ public class HTable implements Table { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - final RetryingTimeTracker tracker = new RetryingTimeTracker(); PayloadCarryingServerCallable<MultiResponse> callable = - new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), + new PayloadCarryingServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), rpcControllerFactory) { - @Override - public MultiResponse call(int callTimeout) throws IOException { - tracker.start(); - controller.setPriority(tableName); - int remainingTime = tracker.getRemainingTime(callTimeout); - if (remainingTime == 0) { - throw new DoNotRetryIOException("Timeout for mutate row"); - } - controller.setCallTimeout(remainingTime); - try { - RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( - getLocation().getRegionInfo().getRegionName(), rm); - regionMutationBuilder.setAtomic(true); - MultiRequest request = - MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - ClientProtos.MultiResponse response = getStub().multi(controller, request); - ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); - if (res.hasException()) { - Throwable ex = ProtobufUtil.toException(res.getException()); - if (ex instanceof IOException) { - throw (IOException) ex; - } - throw new IOException("Failed to mutate row: " + - Bytes.toStringBinary(rm.getRow()), ex); - } - return ResponseConverter.getResults(request, response, controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + @Override + protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception { + RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( + getLocation().getRegionInfo().getRegionName(), rm); + regionMutationBuilder.setAtomic(true); + MultiRequest request = + MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + ClientProtos.MultiResponse response = getStub().multi(controller, request); + ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); + if (res.hasException()) { + Throwable ex = ProtobufUtil.toException(res.getException()); + if (ex instanceof IOException) { + throw (IOException) ex; } + throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex); } - }; + return ResponseConverter.getResults(request, response, controller.cellScanner()); + } + }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), null, null, callable, operationTimeout); ars.waitUntilDone(); @@ -624,38 +596,31 @@ public class HTable implements Table { } } + private static void checkHasFamilies(final Mutation mutation) throws IOException { + if (mutation.numFamilies() == 0) { + throw new IOException("Invalid arguments to " + mutation + ", zero columns specified"); + } + } + /** * {@inheritDoc} */ @Override public Result append(final Append append) throws IOException { - if (append.numFamilies() == 0) { - throw new IOException( - "Invalid arguments to append, no columns specified"); - } - - NonceGenerator ng = this.connection.getNonceGenerator(); - final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); - RegionServerCallable<Result> callable = - new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) { - @Override - public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, - this.operationTimeout); + checkHasFamilies(append); + RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, + this.rpcControllerFactory, getName(), append.getRow()) { + @Override + protected Result call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNewNonce()); + MutateResponse response = getStub().mutate(controller, request); + if (!response.hasResult()) return null; + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + } + }; + return rpcCallerFactory.<Result> newCaller(this.rpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** @@ -663,27 +628,16 @@ public class HTable implements Table { */ @Override public Result increment(final Increment increment) throws IOException { - if (!increment.hasFamilies()) { - throw new IOException( - "Invalid arguments to increment, no columns specified"); - } - NonceGenerator ng = this.connection.getNonceGenerator(); - final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); + checkHasFamilies(increment); RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - getName(), increment.getRow()) { + this.rpcControllerFactory, getName(), increment.getRow()) { @Override - public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + protected Result call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNewNonce()); + MutateResponse response = getStub().mutate(controller, request); + // Should this check for null like append does? + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); } }; return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, @@ -722,28 +676,20 @@ public class HTable implements Table { NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); - RegionServerCallable<Long> callable = - new RegionServerCallable<Long>(connection, getName(), row) { - @Override - public Long call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildIncrementRequest( - getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - Result result = - ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable, - this.operationTimeout); + RegionServerCallable<Long> callable = new RegionServerCallable<Long>(this.connection, + this.rpcControllerFactory, getName(), row) { + @Override + protected Long call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildIncrementRequest( + getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, amount, durability, nonceGroup, nonce); + MutateResponse response = getStub().mutate(controller, request); + Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); + } + }; + return rpcCallerFactory.<Long> newCaller(rpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** @@ -754,26 +700,19 @@ public class HTable implements Table { final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { - RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, put); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, - this.operationTimeout); + RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, + this.rpcControllerFactory, getName(), row) { + @Override + protected Boolean call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), CompareType.EQUAL, put); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } + }; + return rpcCallerFactory.<Boolean> newCaller(rpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** @@ -784,57 +723,42 @@ public class HTable implements Table { final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException { - RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, put); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, - this.operationTimeout); + RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, + this.rpcControllerFactory, getName(), row) { + @Override + protected Boolean call(PayloadCarryingRpcController controller) throws Exception { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, put); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } + }; + return rpcCallerFactory.<Boolean> newCaller(rpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public boolean checkAndDelete(final byte [] row, - final byte [] family, final byte [] qualifier, final byte [] value, - final Delete delete) + public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Delete delete) throws IOException { - RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, - this.operationTimeout); + RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, + this.rpcControllerFactory, getName(), row) { + @Override + protected Boolean call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), CompareType.EQUAL, delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } + }; + return rpcCallerFactory.<Boolean> newCaller(rpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** @@ -845,25 +769,18 @@ public class HTable implements Table { final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException { - RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; + RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(this.connection, + this.rpcControllerFactory, getName(), row) { + @Override + protected Boolean call(PayloadCarryingRpcController controller) throws Exception { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } + }; return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -875,40 +792,28 @@ public class HTable implements Table { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { - final RetryingTimeTracker tracker = new RetryingTimeTracker(); PayloadCarryingServerCallable<MultiResponse> callable = new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), rpcControllerFactory) { @Override - public MultiResponse call(int callTimeout) throws IOException { - tracker.start(); - controller.setPriority(tableName); - int remainingTime = tracker.getRemainingTime(callTimeout); - if (remainingTime == 0) { - throw new DoNotRetryIOException("Timeout for mutate row"); - } - controller.setCallTimeout(remainingTime); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MultiRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, rm); - ClientProtos.MultiResponse response = getStub().multi(controller, request); - ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); - if (res.hasException()) { - Throwable ex = ProtobufUtil.toException(res.getException()); - if(ex instanceof IOException) { - throw (IOException)ex; - } - throw new IOException("Failed to checkAndMutate row: "+ - Bytes.toStringBinary(rm.getRow()), ex); + protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MultiRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, rm); + ClientProtos.MultiResponse response = getStub().multi(controller, request); + ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); + if (res.hasException()) { + Throwable ex = ProtobufUtil.toException(res.getException()); + if (ex instanceof IOException) { + throw (IOException)ex; } - return ResponseConverter.getResults(request, response, controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + throw new IOException("Failed to checkAndMutate row: "+ Bytes.toStringBinary(rm.getRow()), ex); } + return ResponseConverter.getResults(request, response, controller.cellScanner()); } }; + /** * Currently, we use one array to store 'processed' flag which is returned by server. * It is excessive to send such a large array, but that is required by the framework right now @@ -968,7 +873,6 @@ public class HTable implements Table { } /** - * {@inheritDoc} * @throws IOException */ void flushCommits() throws IOException { @@ -1145,19 +1049,18 @@ public class HTable implements Table { for (final byte[] r : keys) { final RegionCoprocessorRpcChannel channel = new RegionCoprocessorRpcChannel(connection, tableName, r); - Future<R> future = pool.submit( - new Callable<R>() { - @Override - public R call() throws Exception { - T instance = ProtobufUtil.newServiceStub(service, channel); - R result = callable.call(instance); - byte[] region = channel.getLastRegion(); - if (callback != null) { - callback.update(region, r, result); - } - return result; - } - }); + Future<R> future = pool.submit(new Callable<R>() { + @Override + public R call() throws Exception { + T instance = ProtobufUtil.newServiceStub(service, channel); + R result = callable.call(instance); + byte[] region = channel.getLastRegion(); + if (callback != null) { + callback.update(region, r, result); + } + return result; + } + }); futures.put(r, future); } for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { @@ -1210,9 +1113,6 @@ public class HTable implements Table { return tableName + ";" + connection; } - /** - * {@inheritDoc} - */ @Override public <R extends Message> Map<byte[], R> batchCoprocessorService( Descriptors.MethodDescriptor methodDescriptor, Message request, @@ -1221,14 +1121,13 @@ public class HTable implements Table { Bytes.BYTES_COMPARATOR)); batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, new Callback<R>() { - - @Override - public void update(byte[] region, byte[] row, R result) { - if (region != null) { - results.put(region, result); - } - } - }); + @Override + public void update(byte[] region, byte[] row, R result) { + if (region != null) { + results.put(region, result); + } + } + }); return results; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java index 66d3c21..ae62255 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java @@ -21,16 +21,24 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; + /** * A RetryingCallable for master operations. * @param <V> return type */ +// Like RegionServerCallable abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable { protected ClusterConnection connection; protected MasterKeepAliveConnection master; + private final PayloadCarryingRpcController rpcController; - public MasterCallable(final Connection connection) { + MasterCallable(final Connection connection, + final RpcControllerFactory rpcConnectionFactory) { this.connection = (ClusterConnection) connection; + this.rpcController = rpcConnectionFactory.newController(); } @Override @@ -59,4 +67,31 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable { public long sleep(long pause, int tries) { return ConnectionUtils.getPauseTime(pause, tries); } + + /** + * Override that changes Exception from {@link Exception} to {@link IOException}. It also does + * setup of an rpcController and calls through to the unimplemented + * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. + */ + @Override + // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate + // and so we contain references to protobuf. We can't set priority on the rpcController as + // we do in RegionServerCallable because we don't always have a Table when we call. + public V call(int callTimeout) throws IOException { + try { + this.rpcController.setCallTimeout(callTimeout); + return call(this.rpcController); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + /** + * Run RPC call. + * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a + * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this + * class. + * @throws Exception + */ + protected abstract V call(PayloadCarryingRpcController rpcController) throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java index e445b78..47693f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java @@ -33,8 +33,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; * against the master on the MasterProtos.MasterService.BlockingInterface; but not by * final user code. Hence it's package protected. */ -interface MasterKeepAliveConnection -extends MasterProtos.MasterService.BlockingInterface { +interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface { // Do this instead of implement Closeable because closeable returning IOE is PITA. void close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index e764ceb..a3162f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -30,8 +30,9 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -41,14 +42,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ServiceException; /** * Callable that handles the <code>multi</code> method call going against a single - * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a - * {@link RegionServerCallable} that goes against multiple regions. + * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a + * RegionServerCallable that goes against multiple regions). * @param <R> */ +@InterfaceAudience.Private class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> { private final MultiAction<R> multiAction; private final boolean cellBlock; @@ -79,7 +80,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse } @Override - public MultiResponse call(int callTimeout) throws IOException { + protected MultiResponse call(PayloadCarryingRpcController controller) throws Exception { int countOfActions = this.multiAction.size(); if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions"); MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); @@ -98,10 +99,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse regionActionBuilder.clear(); regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName)); - - if (this.cellBlock) { - // Presize. Presume at least a KV per Action. There are likely more. + // Pre-size. Presume at least a KV per Action. There are likely more. if (cells == null) cells = new ArrayList<CellScannable>(countOfActions); // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations. // They have already been handled above. Guess at count of cells @@ -116,18 +115,18 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse // Controller optionally carries cell data over the proxy/service boundary and also // optionally ferries cell response data back out again. - if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells)); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); + PayloadCarryingRpcController payloadCarryingRpcController = null; + if (cells != null) { + // Cast. Will fail if we have been passed wrong RpcController type. + payloadCarryingRpcController = (PayloadCarryingRpcController)controller; + payloadCarryingRpcController.setCellScanner(CellUtil.createCellScanner(cells)); + } ClientProtos.MultiResponse responseProto; ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); - try { - responseProto = getStub().multi(controller, requestProto); - } catch (ServiceException e) { - throw ProtobufUtil.getRemoteException(e); - } + responseProto = getStub().multi(controller, requestProto); if (responseProto == null) return null; // Occurs on cancel - return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); + return ResponseConverter.getResults(requestProto, responseProto, + payloadCarryingRpcController == null? null: payloadCarryingRpcController.cellScanner()); } /** @@ -151,4 +150,4 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse ServerName getServerName() { return location.getServerName(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java index d94f069..83d857b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java @@ -16,33 +16,51 @@ */ package org.apache.hadoop.hbase.client; +import java.io.IOException; + +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; /** - * This class is used to unify HTable calls with AsyncProcess Framework. - * HTable can use AsyncProcess directly though this class. + * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use + * AsyncProcess directly though this class. Also adds global timeout tracking on top of + * RegionServerCallable and implements Cancellable. */ @InterfaceAudience.Private -public abstract class PayloadCarryingServerCallable<T> - extends RegionServerCallable<T> implements Cancellable { - protected PayloadCarryingRpcController controller; +abstract class PayloadCarryingServerCallable<T> extends RegionServerCallable<T> + implements Cancellable { + private final RetryingTimeTracker tracker = new RetryingTimeTracker(); + + PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, + RpcControllerFactory rpcControllerFactory) { + super(connection, rpcControllerFactory, tableName, row); + } - public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, - RpcControllerFactory rpcControllerFactory) { - super(connection, tableName, row); - this.controller = rpcControllerFactory.newController(); + /* Override so can mess with the callTimeout. + * (non-Javadoc) + * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int) + */ + @Override + public T call(int callTimeout) throws IOException { + // It is expected (it seems) that tracker.start can be called multiple times (on each trip + // through the call when retrying). Also, we can call start and no need of a stop. + this.tracker.start(); + int remainingTime = tracker.getRemainingTime(callTimeout); + if (remainingTime == 0) { + throw new DoNotRetryIOException("Timeout for mutate row"); + } + return super.call(remainingTime); } @Override public void cancel() { - controller.startCancel(); + getRpcController().startCancel(); } @Override public boolean isCancelled() { - return controller.isCanceled(); + return getRpcController().isCanceled(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java index 54c93a0..4e347dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java @@ -27,31 +27,30 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.util.Bytes; /** - * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable + * Similar to RegionServerCallable but for the AdminService interface. This service callable * assumes a Table and row and thus does region locating similar to RegionServerCallable. + * Works against Admin stub rather than Client stub. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", justification="stub used by ipc") @InterfaceAudience.Private public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> { - - protected final ClusterConnection connection; - - protected final RpcControllerFactory rpcControllerFactory; - protected AdminService.BlockingInterface stub; + protected final RpcControllerFactory rpcControllerFactory; + private PayloadCarryingRpcController controller = null; + protected final ClusterConnection connection; protected HRegionLocation location; - protected final TableName tableName; protected final byte[] row; protected final int replicaId; - protected final static int MIN_WAIT_DEAD_SERVER = 10000; public RegionAdminServiceCallable(ClusterConnection connection, @@ -82,16 +81,13 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable< if (Thread.interrupted()) { throw new InterruptedIOException(); } - if (reload || location == null) { location = getLocation(!reload); } - if (location == null) { // With this exception, there will be a retry. throw new HBaseIOException(getExceptionMessage()); } - this.setStub(connection.getAdmin(location.getServerName())); } @@ -167,7 +163,39 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable< if (rl == null) { throw new RetriesExhaustedException("Can't get the locations"); } - return rl; } -} + + /** + * Override that changes Exception from {@link Exception} to {@link IOException}. It also does + * setup of an rpcController and calls through to the unimplemented + * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. + */ + @Override + // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate + // and so we contain references to protobuf. We can't set priority on the rpcController as + // we do in RegionServerCallable because we don't always have a Table when we call. + public T call(int callTimeout) throws IOException { + this.controller = rpcControllerFactory.newController(); + this.controller.setPriority(this.tableName); + this.controller.setCallTimeout(callTimeout); + try { + return call(this.controller); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() { + return this.controller; + } + + /** + * Run RPC call. + * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a + * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this + * class. + * @throws Exception + */ + protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index d878bae..861b375 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -24,12 +23,20 @@ import java.io.IOException; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; /** - * Implementations call a RegionServer and implement {@link #call(int)}. - * Passed to a {@link RpcRetryingCaller} so we retry on fail. - * TODO: this class is actually tied to one region, because most of the paths make use of + * Implementations make an rpc call against a RegionService via a protobuf Service. + * Implement #rpcCall(RpcController) and then call {@link #call(int)} to + * trigger the rpc. The {@link #call(int)} eventually invokes your + * #rpcCall(RpcController) meanwhile saving you having to write a bunch of + * boilerplate. The {@link #call(int)} implementation is from {@link RpcRetryingCaller} so rpcs are + * retried on fail. + * + * <p>TODO: this class is actually tied to one region, because most of the paths make use of * the regioninfo part of location when building requests. The only reason it works for * multi-region requests (e.g. batch) is that they happen to not use the region parts. * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, @@ -37,18 +44,27 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; * @param <T> the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements - RetryingCallable<T> { - +public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> { private ClientService.BlockingInterface stub; + private final PayloadCarryingRpcController rpcController; /** * @param connection Connection to use. * @param tableName Table name to which <code>row</code> belongs. * @param row The row we want in <code>tableName</code>. */ - public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { + public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, + TableName tableName, byte [] row) { + this(connection, rpcControllerFactory.newController(), tableName, row); + } + + public RegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController, + TableName tableName, byte [] row) { super(connection, tableName, row); + this.rpcController = rpcController; + if (this.rpcController != null) { + this.rpcController.setPriority(tableName); + } } void setClientByServiceName(ServerName service) throws IOException { @@ -69,4 +85,42 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab void setStub(final ClientService.BlockingInterface stub) { this.stub = stub; } -} + + /** + * Override that changes Exception from {@link Exception} to {@link IOException}. It also does + * setup of an rpcController and calls through to the unimplemented + * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. + */ + @Override + public T call(int callTimeout) throws IOException { + if (this.rpcController != null) { + this.rpcController.setCallTimeout(callTimeout); + } + try { + return call(this.rpcController); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + /** + * Run RPC call. + * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a + * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this + * class. + * @throws Exception + */ + protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; + + public PayloadCarryingRpcController getRpcController() { + return this.rpcController; + } + + long getNonceGroup() { + return getConnection().getNonceGenerator().getNonceGroup(); + } + + long getNewNonce() { + return getConnection().getNonceGenerator().newNonce(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java index 24288e6..b9438e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * Tracks the amount of time remaining for an operation. */ class RetryingTimeTracker { - private long globalStartTime = -1; public void start() { @@ -38,16 +37,19 @@ class RetryingTimeTracker { if (callTimeout == Integer.MAX_VALUE) { return Integer.MAX_VALUE; } - int remainingTime = (int) ( - callTimeout - - (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); + long remaining = EnvironmentEdgeManager.currentTime() - this.globalStartTime; + long remainingTime = callTimeout - remaining; if (remainingTime < 1) { // If there is no time left, we're trying anyway. It's too late. // 0 means no timeout, and it's not the intent here. So we secure both cases by // resetting to the minimum. remainingTime = 1; } - return remainingTime; + if (remainingTime > Integer.MAX_VALUE) { + throw new RuntimeException("remainingTime=" + remainingTime + + " which is > Integer.MAX_VALUE"); + } + return (int)remainingTime; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index 0c2d345..644337d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -176,9 +176,9 @@ public class ReversedScannerCallable extends ScannerCallable { @Override public ScannerCallable getScannerCallableForReplica(int id) { - ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName, + ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName, this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id); r.setCaching(this.getCaching()); return r; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java new file mode 100644 index 0000000..68a4aa2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; + +/** + * A RetryingCallable for RPC connection operations. + * @param <V> return type + */ +abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable { + @Override + public void prepare(boolean reload) throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public void throwable(Throwable t, boolean retrying) { + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return ""; + } + + @Override + public long sleep(long pause, int tries) { + return ConnectionUtils.getPauseTime(pause, tries); + } + + @Override + // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate + // and so we contain references to protobuf. + public V call(int callTimeout) throws IOException { + try { + return rpcCall(callTimeout); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + protected abstract V rpcCall(int callTimeout) throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index b4cd2ef..2b2e4c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -22,9 +22,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import java.io.IOException; -/** - * - */ @InterfaceAudience.Public @InterfaceStability.Evolving public interface RpcRetryingCaller<T> { @@ -52,4 +49,4 @@ public interface RpcRetryingCaller<T> { */ T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException; -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 1c723c5..f92aeae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -36,6 +36,7 @@ public class RpcRetryingCallerFactory { private final int rpcTimeout; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; + /* These below data members are UNUSED!!!*/ private final boolean enableBackPressure; private ServerStatisticTracker stats; http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 65dbb10..2785648 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -29,8 +29,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.protobuf.ServiceException; - /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -57,8 +53,6 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Private public class RpcRetryingCallerWithReadReplicas { - private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class); - protected final ExecutorService pool; protected final ClusterConnection cConnection; protected final Configuration conf; @@ -98,7 +92,7 @@ public class RpcRetryingCallerWithReadReplicas { private final PayloadCarryingRpcController controller; public ReplicaRegionServerCallable(int id, HRegionLocation location) { - super(RpcRetryingCallerWithReadReplicas.this.cConnection, + super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); this.id = id; this.location = location; @@ -141,28 +135,20 @@ public class RpcRetryingCallerWithReadReplicas { } @Override - public Result call(int callTimeout) throws Exception { + protected Result call(PayloadCarryingRpcController controller) throws Exception { if (controller.isCanceled()) return null; - if (Thread.interrupted()) { throw new InterruptedIOException(); } - byte[] reg = location.getRegionInfo().getRegionName(); - ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); controller.setCallTimeout(callTimeout); - - try { - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) { - return null; - } - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) { + return null; } + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 72d69ec..1689d11 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -52,9 +52,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; - /** * Scanner operations such as create, next, etc. * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as @@ -74,7 +71,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected boolean renew = false; private Scan scan; private int caching = 1; - protected final ClusterConnection cConnection; protected ScanMetrics scanMetrics; private boolean logScannerActivity = false; private int logCutOffLatency = 1000; @@ -125,9 +121,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { */ public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { - super(connection, tableName, scan.getStartRow()); + super(connection, rpcControllerFactory, tableName, scan.getStartRow()); this.id = id; - this.cConnection = connection; this.scan = scan; this.scanMetrics = scanMetrics; Configuration conf = connection.getConfiguration(); @@ -185,25 +180,16 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { } } - - @Override - public Result [] call(int callTimeout) throws IOException { + protected Result [] call(PayloadCarryingRpcController controller) throws Exception { if (Thread.interrupted()) { throw new InterruptedIOException(); } - - if (controller == null) { - controller = controllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - } - - if (closed) { - if (scannerId != -1) { + if (this.closed) { + if (this.scannerId != -1) { close(); } } else { - if (scannerId == -1L) { + if (this.scannerId == -1L) { this.scannerId = openScanner(); } else { Result [] rrs = null; @@ -212,61 +198,56 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { setHeartbeatMessage(false); try { incRPCcallsMetrics(); - request = - RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, + request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew); ScanResponse response = null; - try { - response = getStub().scan(controller, request); - // Client and RS maintain a nextCallSeq number during the scan. Every next() call - // from client to server will increment this number in both sides. Client passes this - // number along with the request and at RS side both the incoming nextCallSeq and its - // nextCallSeq will be matched. In case of a timeout this increment at the client side - // should not happen. If at the server side fetching of next batch of data was over, - // there will be mismatch in the nextCallSeq number. Server will throw - // OutOfOrderScannerNextException and then client will reopen the scanner with startrow - // as the last successfully retrieved row. - // See HBASE-5974 - nextCallSeq++; - long timestamp = System.currentTimeMillis(); - setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); - // Results are returned via controller - CellScanner cellScanner = controller.cellScanner(); - rrs = ResponseConverter.getResults(cellScanner, response); - if (logScannerActivity) { - long now = System.currentTimeMillis(); - if (now - timestamp > logCutOffLatency) { - int rows = rrs == null ? 0 : rrs.length; - LOG.info("Took " + (now-timestamp) + "ms to fetch " + response = getStub().scan(controller, request); + // Client and RS maintain a nextCallSeq number during the scan. Every next() call + // from client to server will increment this number in both sides. Client passes this + // number along with the request and at RS side both the incoming nextCallSeq and its + // nextCallSeq will be matched. In case of a timeout this increment at the client side + // should not happen. If at the server side fetching of next batch of data was over, + // there will be mismatch in the nextCallSeq number. Server will throw + // OutOfOrderScannerNextException and then client will reopen the scanner with startrow + // as the last successfully retrieved row. + // See HBASE-5974 + nextCallSeq++; + long timestamp = System.currentTimeMillis(); + setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); + // Results are returned via controller + CellScanner cellScanner = controller.cellScanner(); + rrs = ResponseConverter.getResults(cellScanner, response); + if (logScannerActivity) { + long now = System.currentTimeMillis(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info("Took " + (now-timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); - } - } - updateServerSideMetrics(response); - // moreResults is only used for the case where a filter exhausts all elements - if (response.hasMoreResults() && !response.getMoreResults()) { - scannerId = -1L; - closed = true; - // Implied that no results were returned back, either. - return null; } - // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due - // to size or quantity of results in the response. - if (response.hasMoreResultsInRegion()) { - // Set what the RS said - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - // Server didn't respond whether it has more results or not. - setHasMoreResultsContext(false); - } - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + } + updateServerSideMetrics(response); + // moreResults is only used for the case where a filter exhausts all elements + if (response.hasMoreResults() && !response.getMoreResults()) { + this.scannerId = -1L; + this.closed = true; + // Implied that no results were returned back, either. + return null; + } + // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due + // to size or quantity of results in the response. + if (response.hasMoreResultsInRegion()) { + // Set what the RS said + setHasMoreResultsContext(true); + setServerHasMoreResults(response.getMoreResultsInRegion()); + } else { + // Server didn't respond whether it has more results or not. + setHasMoreResultsContext(false); } updateResultsMetrics(rrs); } catch (IOException e) { if (logScannerActivity) { - LOG.info("Got exception making request " + TextFormat.shortDebugString(request) - + " to " + getLocation(), e); + LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " + + getLocation(), e); } IOException ioe = e; if (e instanceof RemoteException) { @@ -275,9 +256,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { if (logScannerActivity && (ioe instanceof UnknownScannerException)) { try { HRegionLocation location = - getConnection().relocateRegion(getTableName(), scan.getStartRow()); - LOG.info("Scanner=" + scannerId - + " expired, current region location is " + location.toString()); + getConnection().relocateRegion(getTableName(), scan.getStartRow()); + LOG.info("Scanner=" + scannerId + " expired, current region location is " + + location.toString()); } catch (Throwable t) { LOG.info("Failed to relocate region", t); } @@ -376,8 +357,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { getStub().scan(controller, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); } } catch (IOException e) { LOG.warn("Ignore, probably already closed", e); @@ -387,10 +368,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected long openScanner() throws IOException { incRPCcallsMetrics(); - ScanRequest request = - RequestConverter.buildScanRequest( - getLocation().getRegionInfo().getRegionName(), - this.scan, 0, false); + ScanRequest request = RequestConverter.buildScanRequest( + getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); try { ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); @@ -399,8 +378,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { + " on region " + getLocation().toString()); } return id; - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); } } @@ -443,11 +422,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { return caching; } - @Override - public ClusterConnection getConnection() { - return cConnection; - } - /** * Set the number of rows that will be fetched on next * @param caching the number of rows for caching @@ -488,4 +462,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) { this.serverHasMoreResultsContext = serverHasMoreResultsContext; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 7b1547d..d6896e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -38,41 +41,35 @@ import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.security.token.Token; -import com.google.protobuf.ServiceException; - /** * Client proxy for SecureBulkLoadProtocol */ @InterfaceAudience.Private public class SecureBulkLoadClient { private Table table; + private final RpcControllerFactory rpcControllerFactory; - public SecureBulkLoadClient(Table table) { + public SecureBulkLoadClient(final Configuration conf, Table table) { this.table = table; + this.rpcControllerFactory = new RpcControllerFactory(conf); } public String prepareBulkLoad(final Connection conn) throws IOException { try { - RegionServerCallable<String> callable = - new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) { - @Override - public String call(int callTimeout) throws IOException { - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - RegionSpecifier region = - RequestConverter - .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); - try { - PrepareBulkLoadRequest request = - PrepareBulkLoadRequest.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(table.getName())) - .setRegion(region).build(); - PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request); - return response.getBulkToken(); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; + RegionServerCallable<String> callable = new RegionServerCallable<String>(conn, + this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) { + @Override + protected String call(PayloadCarryingRpcController controller) throws Exception { + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + RegionSpecifier region = + RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); + PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(table.getName())) + .setRegion(region).build(); + PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request); + return response.getBulkToken(); + } + }; return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { @@ -82,24 +79,19 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { try { - RegionServerCallable<Void> callable = - new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) { - @Override - public Void call(int callTimeout) throws IOException { - byte[] regionName = getLocation().getRegionInfo().getRegionName(); - RegionSpecifier region = RequestConverter.buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - try { - CleanupBulkLoadRequest request = - CleanupBulkLoadRequest.newBuilder().setRegion(region) - .setBulkToken(bulkToken).build(); - getStub().cleanupBulkLoad(null, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - return null; - } - }; + RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn, + this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) { + @Override + protected Void call(PayloadCarryingRpcController controller) throws Exception { + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + RegionSpecifier region = RequestConverter.buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionName); + CleanupBulkLoadRequest request = + CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build(); + getStub().cleanupBulkLoad(null, request); + return null; + } + }; RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE); } catch (Throwable throwable) { @@ -130,12 +122,12 @@ public class SecureBulkLoadClient { try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); return response.getLoaded(); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + } catch (Exception se) { + throw ProtobufUtil.handleRemoteException(se); } } public Path getStagingPath(String bulkToken, byte[] family) throws IOException { return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java index 6fae5cb..a6384e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/MasterCoprocessorRpcChannel.java @@ -77,5 +77,4 @@ public class MasterCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { } return response; } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index f4f18b3..6c290a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -17,24 +17,39 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; /** * Optionally carries Cells across the proxy/service interface down into ipc. On its - * way out it optionally carries a set of result Cell data. We stick the Cells here when we want - * to avoid having to protobuf them. This class is used ferrying data across the proxy/protobuf - * service chasm. Used by client and server ipc'ing. + * way out it optionally carries a set of result Cell data. We stick the Cells here when we want + * to avoid having to protobuf them (for performance reasons). This class is used ferrying data + * across the proxy/protobuf service chasm. Also does call timeout. Used by client and server + * ipc'ing. */ @InterfaceAudience.Private -public class PayloadCarryingRpcController - extends TimeLimitedRpcController implements CellScannable { +public class PayloadCarryingRpcController implements RpcController, CellScannable { + /** + * The time, in ms before the call should expire. + */ + protected volatile Integer callTimeout; + protected volatile boolean cancelled = false; + protected final AtomicReference<RpcCallback<Object>> cancellationCb = new AtomicReference<>(null); + protected final AtomicReference<RpcCallback<IOException>> failureCb = new AtomicReference<>(null); + private IOException exception; public static final int PRIORITY_UNSET = -1; /** @@ -93,15 +108,123 @@ public class PayloadCarryingRpcController } /** + * @param regionName RegionName. If hbase:meta, we'll set high priority. + */ + public void setPriority(final byte [] regionName) { + if (isMetaRegion(regionName)) { + setPriority(TableName.META_TABLE_NAME); + } + } + + private static boolean isMetaRegion(final byte[] regionName) { + return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); + } + + /** * @return The priority of this request */ public int getPriority() { return priority; } - @Override public void reset() { - super.reset(); + @Override + public void reset() { priority = 0; cellScanner = null; + exception = null; + cancelled = false; + failureCb.set(null); + cancellationCb.set(null); + callTimeout = null; + } + + public int getCallTimeout() { + if (callTimeout != null) { + return callTimeout; + } else { + return 0; + } + } + + public void setCallTimeout(int callTimeout) { + this.callTimeout = callTimeout; + } + + public boolean hasCallTimeout(){ + return callTimeout != null; + } + + @Override + public String errorText() { + if (exception != null) { + return exception.getMessage(); + } else { + return null; + } + } + + /** + * For use in async rpc clients + * @return true if failed + */ + @Override + public boolean failed() { + return this.exception != null; + } + + @Override + public boolean isCanceled() { + return cancelled; + } + + @Override + public void notifyOnCancel(RpcCallback<Object> cancellationCb) { + this.cancellationCb.set(cancellationCb); + if (this.cancelled) { + cancellationCb.run(null); + } + } + + /** + * Notify a callback on error. + * For use in async rpc clients + * + * @param failureCb the callback to call on error + */ + public void notifyOnFail(RpcCallback<IOException> failureCb) { + this.failureCb.set(failureCb); + if (this.exception != null) { + failureCb.run(this.exception); + } + } + + @Override + public void setFailed(String reason) { + this.exception = new IOException(reason); + if (this.failureCb.get() != null) { + this.failureCb.get().run(this.exception); + } + } + + /** + * Set failed with an exception to pass on. + * For use in async rpc clients + * + * @param e exception to set with + */ + public void setFailed(IOException e) { + this.exception = e; + if (this.failureCb.get() != null) { + this.failureCb.get().run(this.exception); + } + } + + @Override + public void startCancel() { + cancelled = true; + if (cancellationCb.get() != null) { + cancellationCb.get().run(null); + } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/ed87a81b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java index 55d6375..dbc9041 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java @@ -76,30 +76,23 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel { Descriptors.MethodDescriptor method, Message request, Message responsePrototype) throws IOException { if (LOG.isTraceEnabled()) { - LOG.trace("Call: "+method.getName()+", "+request.toString()); + LOG.trace("Call: " + method.getName() + ", " + request.toString()); } - if (row == null) { throw new IllegalArgumentException("Missing row property for remote region location"); } - - final RpcController rpcController = controller == null - ? rpcControllerFactory.newController() : controller; - final ClientProtos.CoprocessorServiceCall call = CoprocessorRpcUtils.buildServiceCall(row, method, request); RegionServerCallable<CoprocessorServiceResponse> callable = - new RegionServerCallable<CoprocessorServiceResponse>(connection, table, row) { + new RegionServerCallable<CoprocessorServiceResponse>(connection, + controller == null? this.rpcControllerFactory.newController(): + (PayloadCarryingRpcController)controller, + table, row) { @Override - public CoprocessorServiceResponse call(int callTimeout) throws Exception { - if (rpcController instanceof PayloadCarryingRpcController) { - ((PayloadCarryingRpcController) rpcController).setPriority(tableName); - } - if (rpcController instanceof TimeLimitedRpcController) { - ((TimeLimitedRpcController) rpcController).setCallTimeout(callTimeout); - } + protected CoprocessorServiceResponse call(PayloadCarryingRpcController controller) + throws Exception { byte[] regionName = getLocation().getRegionInfo().getRegionName(); - return ProtobufUtil.execService(rpcController, getStub(), call, regionName); + return ProtobufUtil.execService(getRpcController(), getStub(), call, regionName); } }; CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()