HBASE-16607 Make NoncedRegionServerCallable extend CancellableRegionServerCallable
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c19d2cab Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c19d2cab Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c19d2cab Branch: refs/heads/hbase-12439 Commit: c19d2cabbd4c6e312e4926f72d348a5e554cd3dd Parents: 2c3b0f2 Author: chenheng <chenh...@apache.org> Authored: Mon Sep 12 11:03:29 2016 +0800 Committer: chenheng <chenh...@apache.org> Committed: Mon Sep 12 11:03:29 2016 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/HTable.java | 52 +++++++------- .../client/NoncedRegionServerCallable.java | 74 ++------------------ 2 files changed, 31 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c19d2cab/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index e98424c..0d1b156 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -630,17 +630,17 @@ public class HTable implements Table { public Result append(final Append append) throws IOException { checkHasFamilies(append); NoncedRegionServerCallable<Result> callable = - new NoncedRegionServerCallable<Result>(this.connection, - this.rpcControllerFactory, getName(), append.getRow()) { - @Override - protected Result call(HBaseRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } - }; + new NoncedRegionServerCallable<Result>(this.connection, this.rpcControllerFactory, + getName(), append.getRow()) { + @Override + protected Result rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), append, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + if (!response.hasResult()) return null; + return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); + } + }; return rpcCallerFactory.<Result> newCaller(this.writeRpcTimeout). callWithRetries(callable, this.operationTimeout); } @@ -652,16 +652,16 @@ public class HTable implements Table { public Result increment(final Increment increment) throws IOException { checkHasFamilies(increment); NoncedRegionServerCallable<Result> callable = - new NoncedRegionServerCallable<Result>(this.connection, - this.rpcControllerFactory, getName(), increment.getRow()) { - @Override - protected Result call(HBaseRpcController controller) throws Exception { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - // Should this check for null like append does? - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } + new NoncedRegionServerCallable<Result>(this.connection, + this.rpcControllerFactory, getName(), increment.getRow()) { + @Override + protected Result rpcCall() throws Exception { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), increment, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + // Should this check for null like append does? + return ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); + } }; return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); @@ -701,12 +701,12 @@ public class HTable implements Table { new NoncedRegionServerCallable<Long>(this.connection, this.rpcControllerFactory, getName(), row) { @Override - protected Long call(HBaseRpcController controller) throws Exception { + protected Long rpcCall() throws Exception { MutateRequest request = RequestConverter.buildIncrementRequest( - getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability, getNonceGroup(), getNonce()); - MutateResponse response = getStub().mutate(controller, request); - Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, amount, durability, getNonceGroup(), getNonce()); + MutateResponse response = getStub().mutate(getRpcController(), request); + Result result = ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()); return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); } }; http://git-wip-us.apache.org/repos/asf/hbase/blob/c19d2cab/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index 7c98861..7c01e21 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -18,15 +18,9 @@ package org.apache.hadoop.hbase.client; -import java.io.IOException; - -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; /** * Implementations make an rpc call against a RegionService via a protobuf Service. @@ -44,9 +38,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; * @param <T> the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServerCallable<T> { - private ClientService.BlockingInterface stub; - private final HBaseRpcController rpcController; +public abstract class NoncedRegionServerCallable<T> extends CancellableRegionServerCallable<T> { private final long nonce; /** @@ -54,69 +46,13 @@ public abstract class NoncedRegionServerCallable<T> extends AbstractRegionServer * @param tableName Table name to which <code>row</code> belongs. * @param row The row we want in <code>tableName</code>. */ - public NoncedRegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, - TableName tableName, byte [] row) { - this(connection, rpcControllerFactory.newController(), tableName, row); - } - - public NoncedRegionServerCallable(Connection connection, HBaseRpcController rpcController, - TableName tableName, byte [] row) { - super(connection, tableName, row); - this.rpcController = rpcController; + public NoncedRegionServerCallable(Connection connection, + RpcControllerFactory rpcControllerFactory, + TableName tableName, byte [] row) { + super(connection, tableName, row, rpcControllerFactory); this.nonce = getConnection().getNonceGenerator().newNonce(); } - void setClientByServiceName(ServerName service) throws IOException { - this.setStub(getConnection().getClient(service)); - } - - /** - * @return Client Rpc protobuf communication stub - */ - protected ClientService.BlockingInterface getStub() { - return this.stub; - } - - /** - * Set the client protobuf communication stub - * @param stub to set - */ - void setStub(final ClientService.BlockingInterface stub) { - this.stub = stub; - } - - /** - * Override that changes Exception from {@link Exception} to {@link IOException}. It also does - * setup of an rpcController and calls through to the unimplemented - * call(PayloadCarryingRpcController) method; implement this method to add your rpc invocation. - */ - @Override - public T call(int callTimeout) throws IOException { - if (this.rpcController != null) { - this.rpcController.reset(); - this.rpcController.setPriority(tableName); - this.rpcController.setCallTimeout(callTimeout); - } - try { - return call(this.rpcController); - } catch (Exception e) { - throw ProtobufUtil.handleRemoteException(e); - } - } - - /** - * Run RPC call. - * @param rpcController PayloadCarryingRpcController is a mouthful but it at a minimum is a - * facade on protobuf so we don't have to put protobuf everywhere; we can keep it behind this - * class. - * @throws Exception - */ - protected abstract T call(HBaseRpcController rpcController) throws Exception; - - public HBaseRpcController getRpcController() { - return this.rpcController; - } - long getNonceGroup() { return getConnection().getNonceGenerator().getNonceGroup(); }