http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 882e21b..d2423b3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -18,12 +18,6 @@ */ package org.apache.hadoop.hbase.client; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; - import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -43,7 +37,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; @@ -74,6 +67,15 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; +import com.google.common.annotations.VisibleForTesting; +// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY. +// Internally, we use shaded protobuf. This below are part of our public API. +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +// SEE ABOVE NOTE! + /** * An implementation of {@link Table}. Used to communicate with a single HBase table. * Lightweight. Get as needed and just close when done. @@ -416,23 +418,16 @@ public class HTable implements Table { if (get.getConsistency() == Consistency.STRONG) { // Good old call. - final Get getReq = get; + final Get configuredGet = get; RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - getName(), get.getRow()) { + this.rpcControllerFactory, getName(), get.getRow()) { @Override - public Result call(int callTimeout) throws IOException { - ClientProtos.GetRequest request = - RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + protected Result rpcCall() throws Exception { + ClientProtos.GetRequest request = RequestConverter.buildGetRequest( + getLocation().getRegionInfo().getRegionName(), configuredGet); + ClientProtos.GetResponse response = getStub().get(getRpcController(), request); + if (response == null) return null; + return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); } }; return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable, @@ -448,7 +443,6 @@ public class HTable implements Table { return callable.call(operationTimeout); } - /** * {@inheritDoc} */ @@ -459,16 +453,14 @@ public class HTable implements Table { } try { Object[] r1 = new Object[gets.size()]; - batch((List) gets, r1); - - // translate. + batch((List<? extends Row>)gets, r1); + // Translate. Result [] results = new Result[r1.length]; - int i=0; - for (Object o : r1) { - // batch ensures if there is a failure we get an exception instead - results[i++] = (Result) o; + int i = 0; + for (Object obj: r1) { + // Batch ensures if there is a failure we get an exception instead + results[i++] = (Result)obj; } - return results; } catch (InterruptedException e) { throw (InterruptedIOException)new InterruptedIOException().initCause(e); @@ -516,21 +508,13 @@ public class HTable implements Table { public void delete(final Delete delete) throws IOException { RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection, - tableName, delete.getRow()) { + this.rpcControllerFactory, getName(), delete.getRow()) { @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + protected Boolean rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), delete); + MutateResponse response = getStub().mutate(getRpcController(), request); + return Boolean.valueOf(response.getProcessed()); } }; rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, @@ -586,41 +570,28 @@ public class HTable implements Table { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - final RetryingTimeTracker tracker = new RetryingTimeTracker(); - PayloadCarryingServerCallable<MultiResponse> callable = - new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), + CancellableRegionServerCallable<MultiResponse> callable = + new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), rpcControllerFactory) { - @Override - public MultiResponse call(int callTimeout) throws IOException { - tracker.start(); - controller.setPriority(tableName); - int remainingTime = tracker.getRemainingTime(callTimeout); - if (remainingTime == 0) { - throw new DoNotRetryIOException("Timeout for mutate row"); - } - controller.setCallTimeout(remainingTime); - try { - RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( - getLocation().getRegionInfo().getRegionName(), rm); - regionMutationBuilder.setAtomic(true); - MultiRequest request = - MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - ClientProtos.MultiResponse response = getStub().multi(controller, request); - ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); - if (res.hasException()) { - Throwable ex = ProtobufUtil.toException(res.getException()); - if (ex instanceof IOException) { - throw (IOException) ex; - } - throw new IOException("Failed to mutate row: " + - Bytes.toStringBinary(rm.getRow()), ex); - } - return ResponseConverter.getResults(request, response, controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + @Override + protected MultiResponse rpcCall() throws Exception { + RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( + getLocation().getRegionInfo().getRegionName(), rm); + regionMutationBuilder.setAtomic(true); + MultiRequest request = + MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request); + ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); + if (res.hasException()) { + Throwable ex = ProtobufUtil.toException(res.getException()); + if (ex instanceof IOException) { + throw (IOException) ex; } + throw new IOException("Failed to mutate row: " + Bytes.toStringBinary(rm.getRow()), ex); } - }; + return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); + } + }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), null, null, callable, operationTimeout); ars.waitUntilDone(); @@ -629,38 +600,32 @@ public class HTable implements Table { } } + private static void checkHasFamilies(final Mutation mutation) throws IOException { + if (mutation.numFamilies() == 0) { + throw new IOException("Invalid arguments to " + mutation + ", zero columns specified"); + } + } + /** * {@inheritDoc} */ @Override public Result append(final Append append) throws IOException { - if (append.numFamilies() == 0) { - throw new IOException( - "Invalid arguments to append, no columns specified"); - } - - NonceGenerator ng = this.connection.getNonceGenerator(); - final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); - RegionServerCallable<Result> callable = - new RegionServerCallable<Result>(this.connection, getName(), append.getRow()) { - @Override - public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable, - this.operationTimeout); + checkHasFamilies(append); + NoncedRegionServerCallable<Result> callable = + new NoncedRegionServerCallable<Result>(this.connection, + this.rpcControllerFactory, getName(), append.getRow()) { + @Override + protected Result call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(controller, request); + if (!response.hasResult()) return null; + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + } + }; + return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** @@ -668,27 +633,17 @@ public class HTable implements Table { */ @Override public Result increment(final Increment increment) throws IOException { - if (!increment.hasFamilies()) { - throw new IOException( - "Invalid arguments to increment, no columns specified"); - } - NonceGenerator ng = this.connection.getNonceGenerator(); - final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); - RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, - getName(), increment.getRow()) { + checkHasFamilies(increment); + NoncedRegionServerCallable<Result> callable = + new NoncedRegionServerCallable<Result>(this.connection, + this.rpcControllerFactory, getName(), increment.getRow()) { @Override - public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } + protected Result call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(controller, request); + // Should this check for null like append does? + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); } }; return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable, @@ -725,30 +680,21 @@ public class HTable implements Table { "Invalid arguments to incrementColumnValue", npe); } - NonceGenerator ng = this.connection.getNonceGenerator(); - final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); - RegionServerCallable<Long> callable = - new RegionServerCallable<Long>(connection, getName(), row) { - @Override - public Long call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildIncrementRequest( - getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - Result result = - ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable, - this.operationTimeout); + NoncedRegionServerCallable<Long> callable = + new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(), + row) { + @Override + protected Long call(PayloadCarryingRpcController controller) throws Exception { + MutateRequest request = RequestConverter.buildIncrementRequest( + getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, amount, durability, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(controller, request); + Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); + } + }; + return rpcCallerFactory.<Long> newCaller(this.writeRpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** @@ -760,25 +706,19 @@ public class HTable implements Table { final Put put) throws IOException { RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, put); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, - this.operationTimeout); + new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory, + getName(), row) { + @Override + protected Boolean rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), CompareType.EQUAL, put); + MutateResponse response = getStub().mutate(getRpcController(), request); + return Boolean.valueOf(response.getProcessed()); + } + }; + return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** @@ -790,56 +730,43 @@ public class HTable implements Table { final Put put) throws IOException { RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, put); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, - this.operationTimeout); + new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory, + getName(), row) { + @Override + protected Boolean rpcCall() throws Exception { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, put); + MutateResponse response = getStub().mutate(getRpcController(), request); + return Boolean.valueOf(response.getProcessed()); + } + }; + return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public boolean checkAndDelete(final byte [] row, - final byte [] family, final byte [] qualifier, final byte [] value, - final Delete delete) + public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Delete delete) throws IOException { RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, - this.operationTimeout); + new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory, + getName(), row) { + @Override + protected Boolean rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), CompareType.EQUAL, delete); + MutateResponse response = getStub().mutate(getRpcController(), request); + return Boolean.valueOf(response.getProcessed()); + } + }; + return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout). + callWithRetries(callable, this.operationTimeout); } /** @@ -851,25 +778,19 @@ public class HTable implements Table { final Delete delete) throws IOException { RegionServerCallable<Boolean> callable = - new RegionServerCallable<Boolean>(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, + new RegionServerCallable<Boolean>(this.connection, this.rpcControllerFactory, + getName(), row) { + @Override + protected Boolean rpcCall() throws Exception { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, delete); + MutateResponse response = getStub().mutate(getRpcController(), request); + return Boolean.valueOf(response.getProcessed()); + } + }; + return rpcCallerFactory.<Boolean> newCaller(this.writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -880,40 +801,29 @@ public class HTable implements Table { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { - final RetryingTimeTracker tracker = new RetryingTimeTracker(); - PayloadCarryingServerCallable<MultiResponse> callable = - new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), + CancellableRegionServerCallable<MultiResponse> callable = + new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), rpcControllerFactory) { @Override - public MultiResponse call(int callTimeout) throws IOException { - tracker.start(); - controller.setPriority(tableName); - int remainingTime = tracker.getRemainingTime(callTimeout); - if (remainingTime == 0) { - throw new DoNotRetryIOException("Timeout for mutate row"); - } - controller.setCallTimeout(remainingTime); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MultiRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, rm); - ClientProtos.MultiResponse response = getStub().multi(controller, request); - ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); - if (res.hasException()) { - Throwable ex = ProtobufUtil.toException(res.getException()); - if(ex instanceof IOException) { - throw (IOException)ex; - } - throw new IOException("Failed to checkAndMutate row: "+ - Bytes.toStringBinary(rm.getRow()), ex); + protected MultiResponse rpcCall() throws Exception { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MultiRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, rm); + ClientProtos.MultiResponse response = getStub().multi(getRpcController(), request); + ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); + if (res.hasException()) { + Throwable ex = ProtobufUtil.toException(res.getException()); + if (ex instanceof IOException) { + throw (IOException)ex; } - return ResponseConverter.getResults(request, response, controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + throw new IOException("Failed to checkAndMutate row: "+ + Bytes.toStringBinary(rm.getRow()), ex); } + return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); } }; + /** * Currently, we use one array to store 'processed' flag which is returned by server. * It is excessive to send such a large array, but that is required by the framework right now @@ -973,7 +883,6 @@ public class HTable implements Table { } /** - * {@inheritDoc} * @throws IOException */ void flushCommits() throws IOException { @@ -1150,19 +1059,18 @@ public class HTable implements Table { for (final byte[] r : keys) { final RegionCoprocessorRpcChannel channel = new RegionCoprocessorRpcChannel(connection, tableName, r); - Future<R> future = pool.submit( - new Callable<R>() { - @Override - public R call() throws Exception { - T instance = ProtobufUtil.newServiceStub(service, channel); - R result = callable.call(instance); - byte[] region = channel.getLastRegion(); - if (callback != null) { - callback.update(region, r, result); - } - return result; - } - }); + Future<R> future = pool.submit(new Callable<R>() { + @Override + public R call() throws Exception { + T instance = ProtobufUtil.newServiceStub(service, channel); + R result = callable.call(instance); + byte[] region = channel.getLastRegion(); + if (callback != null) { + callback.update(region, r, result); + } + return result; + } + }); futures.put(r, future); } for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) { @@ -1236,9 +1144,6 @@ public class HTable implements Table { return tableName + ";" + connection; } - /** - * {@inheritDoc} - */ @Override public <R extends Message> Map<byte[], R> batchCoprocessorService( Descriptors.MethodDescriptor methodDescriptor, Message request, @@ -1247,14 +1152,13 @@ public class HTable implements Table { Bytes.BYTES_COMPARATOR)); batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, new Callback<R>() { - - @Override - public void update(byte[] region, byte[] row, R result) { - if (region != null) { - results.put(region, result); - } - } - }); + @Override + public void update(byte[] region, byte[] row, R result) { + if (region != null) { + results.put(region, result); + } + } + }); return results; }
http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java index 66d3c21..8c4da68 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterCallable.java @@ -21,16 +21,34 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.util.Bytes; + /** - * A RetryingCallable for master operations. + * A RetryingCallable for Master RPC operations. + * Implement the #rpcCall method. It will be retried on error. See its javadoc and the javadoc of + * #call(int). See {@link HBaseAdmin} for examples of how this is used. To get at the + * rpcController that has been created and configured to make this rpc call, use getRpcController(). + * We are trying to contain all protobuf references including references to rpcController so we + * don't pollute codebase with protobuf references; keep the protobuf references contained and only + * present in a few classes rather than all about the code base. + * <p>Like {@link RegionServerCallable} only in here, we can safely be PayloadCarryingRpcController + * all the time. This is not possible in the similar {@link RegionServerCallable} Callable because + * it has to deal with Coprocessor Endpoints. * @param <V> return type */ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable { - protected ClusterConnection connection; + protected final ClusterConnection connection; protected MasterKeepAliveConnection master; + private final PayloadCarryingRpcController rpcController; - public MasterCallable(final Connection connection) { + MasterCallable(final Connection connection, final RpcControllerFactory rpcConnectionFactory) { this.connection = (ClusterConnection) connection; + this.rpcController = rpcConnectionFactory.newController(); } @Override @@ -43,6 +61,7 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable { // The above prepare could fail but this would still be called though masterAdmin is null if (this.master != null) { this.master.close(); + this.master = null; } } @@ -59,4 +78,65 @@ abstract class MasterCallable<V> implements RetryingCallable<V>, Closeable { public long sleep(long pause, int tries) { return ConnectionUtils.getPauseTime(pause, tries); } + + /** + * Override that changes the {@link Callable#call()} Exception from {@link Exception} to + * {@link IOException}. It also does setup of an rpcController and calls through to the rpcCall() + * method which callers are expected to implement. If rpcController is an instance of + * PayloadCarryingRpcController, we will set a timeout on it. + */ + @Override + // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate + // and so we contain references to protobuf. We can't set priority on the rpcController as + // we do in RegionServerCallable because we don't always have a Table when we call. + public V call(int callTimeout) throws IOException { + try { + if (this.rpcController != null) { + this.rpcController.setCallTimeout(callTimeout); + } + return rpcCall(); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + /** + * Run the RPC call. Implement this method. To get at the rpcController that has been created + * and configured to make this rpc call, use getRpcController(). We are trying to contain + * rpcController references so we don't pollute codebase with protobuf references; keep the + * protobuf references contained and only present in a few classes rather than all about the + * code base. + * @throws Exception + */ + protected abstract V rpcCall() throws Exception; + + PayloadCarryingRpcController getRpcController() { + return this.rpcController; + } + + void setPriority(final int priority) { + if (this.rpcController != null) { + this.rpcController.setPriority(priority); + } + } + + void setPriority(final TableName tableName) { + if (this.rpcController != null) { + this.rpcController.setPriority(tableName); + } + } + + /** + * @param regionName RegionName. If hbase:meta, we'll set high priority. + */ + void setPriority(final byte [] regionName) { + if (isMetaRegion(regionName)) { + setPriority(TableName.META_TABLE_NAME); + } + } + + private static boolean isMetaRegion(final byte[] regionName) { + return Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getRegionName()) + || Bytes.equals(regionName, HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java index e445b78..47693f4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterKeepAliveConnection.java @@ -33,8 +33,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; * against the master on the MasterProtos.MasterService.BlockingInterface; but not by * final user code. Hence it's package protected. */ -interface MasterKeepAliveConnection -extends MasterProtos.MasterService.BlockingInterface { +interface MasterKeepAliveConnection extends MasterProtos.MasterService.BlockingInterface { // Do this instead of implement Closeable because closeable returning IOE is PITA. void close(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index e764ceb..1ce4aab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -30,8 +30,9 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -41,15 +42,15 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ServiceException; /** * Callable that handles the <code>multi</code> method call going against a single - * regionserver; i.e. A {@link RegionServerCallable} for the multi call (It is not a - * {@link RegionServerCallable} that goes against multiple regions. + * regionserver; i.e. A RegionServerCallable for the multi call (It is NOT a + * RegionServerCallable that goes against multiple regions). * @param <R> */ -class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> { +@InterfaceAudience.Private +class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> { private final MultiAction<R> multiAction; private final boolean cellBlock; @@ -79,7 +80,7 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse } @Override - public MultiResponse call(int callTimeout) throws IOException { + protected MultiResponse rpcCall() throws Exception { int countOfActions = this.multiAction.size(); if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions"); MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); @@ -98,10 +99,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse regionActionBuilder.clear(); regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName)); - - if (this.cellBlock) { - // Presize. Presume at least a KV per Action. There are likely more. + // Pre-size. Presume at least a KV per Action. There are likely more. if (cells == null) cells = new ArrayList<CellScannable>(countOfActions); // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations. // They have already been handled above. Guess at count of cells @@ -114,20 +113,13 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse multiRequestBuilder.addRegionAction(regionActionBuilder.build()); } - // Controller optionally carries cell data over the proxy/service boundary and also - // optionally ferries cell response data back out again. - if (cells != null) controller.setCellScanner(CellUtil.createCellScanner(cells)); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - ClientProtos.MultiResponse responseProto; - ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); - try { - responseProto = getStub().multi(controller, requestProto); - } catch (ServiceException e) { - throw ProtobufUtil.getRemoteException(e); + if (cells != null) { + setRpcControllerCellScanner(CellUtil.createCellScanner(cells)); } + ClientProtos.MultiRequest requestProto = multiRequestBuilder.build(); + ClientProtos.MultiResponse responseProto = getStub().multi(getRpcController(), requestProto); if (responseProto == null) return null; // Occurs on cancel - return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner()); + return ResponseConverter.getResults(requestProto, responseProto, getRpcControllerCellScanner()); } /** @@ -151,4 +143,4 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse ServerName getServerName() { return location.getServerName(); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java new file mode 100644 index 0000000..21e77bd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; + +/** + * Implementations make an rpc call against a RegionService via a protobuf Service. + * Implement #rpcCall(RpcController) and then call {@link #call(int)} to + * trigger the rpc. The {@link #call(int)} eventually invokes your + * #rpcCall(RpcController) meanwhile saving you having to write a bunch of + * boilerplate. The {@link #call(int)} implementation is from {@link RpcRetryingCaller} so rpcs are + * retried on fail. + * + * <p>TODO: this class is actually tied to one region, because most of the paths make use of + * the regioninfo part of location when building requests. The only reason it works for + * multi-region requests (e.g. batch) is that they happen to not use the region parts. + * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, + * RegionCallable and actual RegionServerCallable with ServerName. + * @param <T> the class that the ServerCallable handles + */ +@InterfaceAudience.Private +public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServerCallable<T> { + private ClientService.BlockingInterface stub; + private final PayloadCarryingRpcController rpcController; + private final long nonce; + + /** + * @param connection Connection to use. + * @param tableName Table name to which <code>row</code> belongs. + * @param row The row we want in <code>tableName</code>. + */ + public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, + TableName tableName, byte [] row) { + this(connection, rpcControllerFactory.newController(), tableName, row); + } + + public NoncedRegionServerCallable(Connection connection, PayloadCarryingRpcController rpcController, + TableName tableName, byte [] row) { + super(connection, tableName, row); + this.rpcController = rpcController; + if (this.rpcController != null) { + this.rpcController.setPriority(tableName); + } + this.nonce = getConnection().getNonceGenerator().newNonce(); + } + + void setClientByServiceName(ServerName service) throws IOException { + this.setStub(getConnection().getClient(service)); + } + + /** + * @return Client Rpc protobuf communication stub + */ + protected ClientService.BlockingInterface getStub() { + return this.stub; + } + + /** + * Set the client protobuf communication stub + * @param stub to set + */ + void setStub(final ClientService.BlockingInterface stub) { + this.stub = stub; + } + + /** + * Override that changes Exception from {@link Exception} to {@link IOException}. It also does + * setup of an rpcController and calls through to the unimplemented + * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. + */ + @Override + public T call(int callTimeout) throws IOException { + if (this.rpcController != null) { + this.rpcController.setCallTimeout(callTimeout); + } + try { + return call(this.rpcController); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + /** + * Run RPC call. + * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a + * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this + * class. + * @throws Exception + */ + protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; + + public PayloadCarryingRpcController getRpcController() { + return this.rpcController; + } + + long getNonceGroup() { + return getConnection().getNonceGenerator().getNonceGroup(); + } + + long getNonce() { + return this.nonce; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java deleted file mode 100644 index d94f069..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PayloadCarryingServerCallable.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; - -/** - * This class is used to unify HTable calls with AsyncProcess Framework. - * HTable can use AsyncProcess directly though this class. - */ -@InterfaceAudience.Private -public abstract class PayloadCarryingServerCallable<T> - extends RegionServerCallable<T> implements Cancellable { - protected PayloadCarryingRpcController controller; - - public PayloadCarryingServerCallable(Connection connection, TableName tableName, byte[] row, - RpcControllerFactory rpcControllerFactory) { - super(connection, tableName, row); - this.controller = rpcControllerFactory.newController(); - } - - @Override - public void cancel() { - controller.startCancel(); - } - - @Override - public boolean isCancelled() { - return controller.isCanceled(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java index 54c93a0..4e347dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionAdminServiceCallable.java @@ -27,31 +27,30 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.util.Bytes; /** - * Similar to {@link RegionServerCallable} but for the AdminService interface. This service callable + * Similar to RegionServerCallable but for the AdminService interface. This service callable * assumes a Table and row and thus does region locating similar to RegionServerCallable. + * Works against Admin stub rather than Client stub. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD", justification="stub used by ipc") @InterfaceAudience.Private public abstract class RegionAdminServiceCallable<T> implements RetryingCallable<T> { - - protected final ClusterConnection connection; - - protected final RpcControllerFactory rpcControllerFactory; - protected AdminService.BlockingInterface stub; + protected final RpcControllerFactory rpcControllerFactory; + private PayloadCarryingRpcController controller = null; + protected final ClusterConnection connection; protected HRegionLocation location; - protected final TableName tableName; protected final byte[] row; protected final int replicaId; - protected final static int MIN_WAIT_DEAD_SERVER = 10000; public RegionAdminServiceCallable(ClusterConnection connection, @@ -82,16 +81,13 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable< if (Thread.interrupted()) { throw new InterruptedIOException(); } - if (reload || location == null) { location = getLocation(!reload); } - if (location == null) { // With this exception, there will be a retry. throw new HBaseIOException(getExceptionMessage()); } - this.setStub(connection.getAdmin(location.getServerName())); } @@ -167,7 +163,39 @@ public abstract class RegionAdminServiceCallable<T> implements RetryingCallable< if (rl == null) { throw new RetriesExhaustedException("Can't get the locations"); } - return rl; } -} + + /** + * Override that changes Exception from {@link Exception} to {@link IOException}. It also does + * setup of an rpcController and calls through to the unimplemented + * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. + */ + @Override + // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate + // and so we contain references to protobuf. We can't set priority on the rpcController as + // we do in RegionServerCallable because we don't always have a Table when we call. + public T call(int callTimeout) throws IOException { + this.controller = rpcControllerFactory.newController(); + this.controller.setPriority(this.tableName); + this.controller.setCallTimeout(callTimeout); + try { + return call(this.controller); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + PayloadCarryingRpcController getCurrentPayloadCarryingRpcController() { + return this.controller; + } + + /** + * Run RPC call. + * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a + * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this + * class. + * @throws Exception + */ + protected abstract T call(PayloadCarryingRpcController rpcController) throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index d878bae..3771c50 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -1,5 +1,4 @@ /** - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,34 +20,62 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import com.google.protobuf.RpcController; + /** - * Implementations call a RegionServer and implement {@link #call(int)}. - * Passed to a {@link RpcRetryingCaller} so we retry on fail. - * TODO: this class is actually tied to one region, because most of the paths make use of + * Implementations make an rpc call against a RegionService via a protobuf Service. + * Implement rpcCall(). Be sure to make use of the RpcController that this instance is carrying + * via {@link #getRpcController()}. + * + * <p>TODO: this class is actually tied to one region, because most of the paths make use of * the regioninfo part of location when building requests. The only reason it works for * multi-region requests (e.g. batch) is that they happen to not use the region parts. * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, * RegionCallable and actual RegionServerCallable with ServerName. + * * @param <T> the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> implements - RetryingCallable<T> { - +public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> { private ClientService.BlockingInterface stub; + /* This is 99% of the time a PayloadCarryingRpcController but this RegionServerCallable is + * also used doing Coprocessor Endpoints and in this case, it is a ServerRpcControllable which is + * not a PayloadCarryingRpcController. Too hard to untangle it all at this stage since + * downstreamers are using RegionServerCallable invoking CPEPs so just do ugly instanceof + * checks in the below. + */ + private final RpcController rpcController; + /** * @param connection Connection to use. * @param tableName Table name to which <code>row</code> belongs. * @param row The row we want in <code>tableName</code>. */ - public RegionServerCallable(Connection connection, TableName tableName, byte [] row) { + public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, + TableName tableName, byte [] row) { + this(connection, rpcControllerFactory.newController(), tableName, row); + } + + public RegionServerCallable(Connection connection, RpcController rpcController, + TableName tableName, byte [] row) { super(connection, tableName, row); + this.rpcController = rpcController; + // If it is an instance of PayloadCarryingRpcController, we can set priority on the + // controller based off the tableName. RpcController may be null in tests when mocking so allow + // for null controller. + if (this.rpcController != null && this.rpcController instanceof PayloadCarryingRpcController) { + ((PayloadCarryingRpcController)this.rpcController).setPriority(tableName); + } } void setClientByServiceName(ServerName service) throws IOException { @@ -69,4 +96,55 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab void setStub(final ClientService.BlockingInterface stub) { this.stub = stub; } -} + + /** + * Override that changes call Exception from {@link Exception} to {@link IOException}. It also + * does setup of an rpcController and calls through to the unimplemented + * rpcCall() method. If rpcController is an instance of PayloadCarryingRpcController, + * we will set a timeout on it. + */ + @Override + public T call(int callTimeout) throws IOException { + try { + if (this.rpcController != null && + this.rpcController instanceof PayloadCarryingRpcController) { + ((PayloadCarryingRpcController)this.rpcController).setCallTimeout(callTimeout); + // Do a reset of the CellScanner in case we are carrying any Cells since last time through. + setRpcControllerCellScanner(null); + } + return rpcCall(); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + /** + * Run the RPC call. Implement this method. To get at the rpcController that has been created + * and configured to make this rpc call, use getRpcController(). We are trying to contain + * rpcController references so we don't pollute codebase with protobuf references; keep the + * protobuf references contained and only present in a few classes rather than all about the + * code base. + * @throws Exception + */ + protected abstract T rpcCall() throws Exception; + + protected RpcController getRpcController() { + return this.rpcController; + } + + /** + * Get the RpcController CellScanner. + * If the RpcController is a PayloadCarryingRpcController, which it is in all cases except + * when we are processing Coprocessor Endpoint, then this method returns a reference to the + * CellScanner that the PayloadCarryingRpcController is carrying. Do it up here in this Callable + * so we don't have to scatter ugly instanceof tests around the codebase. Will fail if called in + * a Coproccessor Endpoint context. Should never happen. + */ + protected CellScanner getRpcControllerCellScanner() { + return ((PayloadCarryingRpcController)this.rpcController).cellScanner(); + } + + protected void setRpcControllerCellScanner(CellScanner cellScanner) { + ((PayloadCarryingRpcController)this.rpcController).setCellScanner(cellScanner); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java index 2377a0d..afbcc9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java @@ -36,4 +36,4 @@ public interface RetryingCallable<T> extends RetryingCallableBase { * @throws Exception if unable to compute a result */ T call(int callTimeout) throws Exception; -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java index 24288e6..b9438e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; * Tracks the amount of time remaining for an operation. */ class RetryingTimeTracker { - private long globalStartTime = -1; public void start() { @@ -38,16 +37,19 @@ class RetryingTimeTracker { if (callTimeout == Integer.MAX_VALUE) { return Integer.MAX_VALUE; } - int remainingTime = (int) ( - callTimeout - - (EnvironmentEdgeManager.currentTime() - this.globalStartTime)); + long remaining = EnvironmentEdgeManager.currentTime() - this.globalStartTime; + long remainingTime = callTimeout - remaining; if (remainingTime < 1) { // If there is no time left, we're trying anyway. It's too late. // 0 means no timeout, and it's not the intent here. So we secure both cases by // resetting to the minimum. remainingTime = 1; } - return remainingTime; + if (remainingTime > Integer.MAX_VALUE) { + throw new RuntimeException("remainingTime=" + remainingTime + + " which is > Integer.MAX_VALUE"); + } + return (int)remainingTime; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index 0c2d345..a5bebd0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -176,9 +176,9 @@ public class ReversedScannerCallable extends ScannerCallable { @Override public ScannerCallable getScannerCallableForReplica(int id) { - ReversedScannerCallable r = new ReversedScannerCallable(this.cConnection, this.tableName, - this.getScan(), this.scanMetrics, this.locateStartRow, controllerFactory, id); + ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName, + this.getScan(), this.scanMetrics, this.locateStartRow, rpcControllerFactory, id); r.setCaching(this.getCaching()); return r; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java new file mode 100644 index 0000000..68a4aa2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; + +/** + * A RetryingCallable for RPC connection operations. + * @param <V> return type + */ +abstract class RpcRetryingCallable<V> implements RetryingCallable<V>, Closeable { + @Override + public void prepare(boolean reload) throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public void throwable(Throwable t, boolean retrying) { + } + + @Override + public String getExceptionMessageAdditionalDetail() { + return ""; + } + + @Override + public long sleep(long pause, int tries) { + return ConnectionUtils.getPauseTime(pause, tries); + } + + @Override + // Same trick as in RegionServerCallable so users don't have to copy/paste so much boilerplate + // and so we contain references to protobuf. + public V call(int callTimeout) throws IOException { + try { + return rpcCall(callTimeout); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); + } + } + + protected abstract V rpcCall(int callTimeout) throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index b4cd2ef..2b2e4c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -22,9 +22,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import java.io.IOException; -/** - * - */ @InterfaceAudience.Public @InterfaceStability.Evolving public interface RpcRetryingCaller<T> { @@ -52,4 +49,4 @@ public interface RpcRetryingCaller<T> { */ T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException; -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 1c723c5..f92aeae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -36,6 +36,7 @@ public class RpcRetryingCallerFactory { private final int rpcTimeout; private final RetryingCallerInterceptor interceptor; private final int startLogErrorsCnt; + /* These below data members are UNUSED!!!*/ private final boolean enableBackPressure; private ServerStatisticTracker stats; http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 65dbb10..8d63295 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -29,8 +29,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -46,8 +44,6 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.protobuf.ServiceException; - /** * Caller that goes to replica if the primary region does no answer within a configurable @@ -57,8 +53,6 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Private public class RpcRetryingCallerWithReadReplicas { - private static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class); - protected final ExecutorService pool; protected final ClusterConnection cConnection; protected final Configuration conf; @@ -98,7 +92,7 @@ public class RpcRetryingCallerWithReadReplicas { private final PayloadCarryingRpcController controller; public ReplicaRegionServerCallable(int id, HRegionLocation location) { - super(RpcRetryingCallerWithReadReplicas.this.cConnection, + super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); this.id = id; this.location = location; @@ -141,28 +135,22 @@ public class RpcRetryingCallerWithReadReplicas { } @Override - public Result call(int callTimeout) throws Exception { + protected Result rpcCall() throws Exception { if (controller.isCanceled()) return null; - if (Thread.interrupted()) { throw new InterruptedIOException(); } - byte[] reg = location.getRegionInfo().getRegionName(); - ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); - controller.setCallTimeout(callTimeout); - - try { - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) { - return null; - } - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + // Presumption that we are passed a PayloadCarryingRpcController here! + PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller; + pcrc.setCallTimeout(callTimeout); + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) { + return null; } + return ProtobufUtil.toResult(response.getResult(), pcrc.cellScanner()); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 72d69ec..0ee54d0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -52,9 +51,6 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; - /** * Scanner operations such as create, next, etc. * Used by {@link ResultScanner}s made by {@link Table}. Passed to a retrying caller such as @@ -74,7 +70,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected boolean renew = false; private Scan scan; private int caching = 1; - protected final ClusterConnection cConnection; protected ScanMetrics scanMetrics; private boolean logScannerActivity = false; private int logCutOffLatency = 1000; @@ -99,8 +94,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { // indicate if it is a remote server call protected boolean isRegionServerRemote = true; private long nextCallSeq = 0; - protected RpcControllerFactory controllerFactory; - protected PayloadCarryingRpcController controller; + protected final RpcControllerFactory rpcControllerFactory; /** * @param connection which connection @@ -125,19 +119,14 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { */ public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { - super(connection, tableName, scan.getStartRow()); + super(connection, rpcControllerFactory, tableName, scan.getStartRow()); this.id = id; - this.cConnection = connection; this.scan = scan; this.scanMetrics = scanMetrics; Configuration conf = connection.getConfiguration(); logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); - this.controllerFactory = rpcControllerFactory; - } - - PayloadCarryingRpcController getController() { - return controller; + this.rpcControllerFactory = rpcControllerFactory; } /** @@ -185,25 +174,16 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { } } - - @Override - public Result [] call(int callTimeout) throws IOException { + protected Result [] rpcCall() throws Exception { if (Thread.interrupted()) { throw new InterruptedIOException(); } - - if (controller == null) { - controller = controllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - } - - if (closed) { - if (scannerId != -1) { + if (this.closed) { + if (this.scannerId != -1) { close(); } } else { - if (scannerId == -1L) { + if (this.scannerId == -1L) { this.scannerId = openScanner(); } else { Result [] rrs = null; @@ -212,61 +192,54 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { setHeartbeatMessage(false); try { incRPCcallsMetrics(); - request = - RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, + request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, this.scanMetrics != null, renew); ScanResponse response = null; - try { - response = getStub().scan(controller, request); - // Client and RS maintain a nextCallSeq number during the scan. Every next() call - // from client to server will increment this number in both sides. Client passes this - // number along with the request and at RS side both the incoming nextCallSeq and its - // nextCallSeq will be matched. In case of a timeout this increment at the client side - // should not happen. If at the server side fetching of next batch of data was over, - // there will be mismatch in the nextCallSeq number. Server will throw - // OutOfOrderScannerNextException and then client will reopen the scanner with startrow - // as the last successfully retrieved row. - // See HBASE-5974 - nextCallSeq++; - long timestamp = System.currentTimeMillis(); - setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); - // Results are returned via controller - CellScanner cellScanner = controller.cellScanner(); - rrs = ResponseConverter.getResults(cellScanner, response); - if (logScannerActivity) { - long now = System.currentTimeMillis(); - if (now - timestamp > logCutOffLatency) { - int rows = rrs == null ? 0 : rrs.length; - LOG.info("Took " + (now-timestamp) + "ms to fetch " + response = getStub().scan(getRpcController(), request); + // Client and RS maintain a nextCallSeq number during the scan. Every next() call + // from client to server will increment this number in both sides. Client passes this + // number along with the request and at RS side both the incoming nextCallSeq and its + // nextCallSeq will be matched. In case of a timeout this increment at the client side + // should not happen. If at the server side fetching of next batch of data was over, + // there will be mismatch in the nextCallSeq number. Server will throw + // OutOfOrderScannerNextException and then client will reopen the scanner with startrow + // as the last successfully retrieved row. + // See HBASE-5974 + nextCallSeq++; + long timestamp = System.currentTimeMillis(); + setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); + rrs = ResponseConverter.getResults(getRpcControllerCellScanner(), response); + if (logScannerActivity) { + long now = System.currentTimeMillis(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info("Took " + (now-timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); - } } - updateServerSideMetrics(response); - // moreResults is only used for the case where a filter exhausts all elements - if (response.hasMoreResults() && !response.getMoreResults()) { - scannerId = -1L; - closed = true; - // Implied that no results were returned back, either. - return null; - } - // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due - // to size or quantity of results in the response. - if (response.hasMoreResultsInRegion()) { - // Set what the RS said - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - // Server didn't respond whether it has more results or not. - setHasMoreResultsContext(false); - } - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + } + updateServerSideMetrics(response); + // moreResults is only used for the case where a filter exhausts all elements + if (response.hasMoreResults() && !response.getMoreResults()) { + this.scannerId = -1L; + this.closed = true; + // Implied that no results were returned back, either. + return null; + } + // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due + // to size or quantity of results in the response. + if (response.hasMoreResultsInRegion()) { + // Set what the RS said + setHasMoreResultsContext(true); + setServerHasMoreResults(response.getMoreResultsInRegion()); + } else { + // Server didn't respond whether it has more results or not. + setHasMoreResultsContext(false); } updateResultsMetrics(rrs); } catch (IOException e) { if (logScannerActivity) { - LOG.info("Got exception making request " + TextFormat.shortDebugString(request) - + " to " + getLocation(), e); + LOG.info("Got exception making request " + ProtobufUtil.toText(request) + " to " + + getLocation(), e); } IOException ioe = e; if (e instanceof RemoteException) { @@ -275,9 +248,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { if (logScannerActivity && (ioe instanceof UnknownScannerException)) { try { HRegionLocation location = - getConnection().relocateRegion(getTableName(), scan.getStartRow()); - LOG.info("Scanner=" + scannerId - + " expired, current region location is " + location.toString()); + getConnection().relocateRegion(getTableName(), scan.getStartRow()); + LOG.info("Scanner=" + scannerId + " expired, current region location is " + + location.toString()); } catch (Throwable t) { LOG.info("Failed to relocate region", t); } @@ -375,9 +348,9 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true, this.scanMetrics != null); try { - getStub().scan(controller, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + getStub().scan(getRpcController(), request); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); } } catch (IOException e) { LOG.warn("Ignore, probably already closed", e); @@ -387,20 +360,18 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected long openScanner() throws IOException { incRPCcallsMetrics(); - ScanRequest request = - RequestConverter.buildScanRequest( - getLocation().getRegionInfo().getRegionName(), - this.scan, 0, false); + ScanRequest request = RequestConverter.buildScanRequest( + getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); try { - ScanResponse response = getStub().scan(controller, request); + ScanResponse response = getStub().scan(getRpcController(), request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() + " on region " + getLocation().toString()); } return id; - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); } } @@ -443,11 +414,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { return caching; } - @Override - public ClusterConnection getConnection() { - return cConnection; - } - /** * Set the number of rows that will be fetched on next * @param caching the number of rows for caching @@ -458,7 +424,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { public ScannerCallable getScannerCallableForReplica(int id) { ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName, - this.getScan(), this.scanMetrics, controllerFactory, id); + this.getScan(), this.scanMetrics, this.rpcControllerFactory, id); s.setCaching(this.caching); return s; } @@ -488,4 +454,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) { this.serverHasMoreResultsContext = serverHasMoreResultsContext; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/45bb6180/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index c3a3834..096841b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -267,7 +267,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { /** * When a scanner switches in the middle of scanning (the 'next' call fails * for example), the upper layer {@link ClientScanner} needs to know - * @return */ public boolean switchedToADifferentReplica() { return replicaSwitched.get(); @@ -398,8 +397,8 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { public void cancel() { cancelled = true; caller.cancel(); - if (callable.getController() != null) { - callable.getController().startCancel(); + if (callable.getRpcController() != null) { + callable.getRpcController().startCancel(); } someRPCcancelled = true; }