HBASE-17397 AggregationClient cleanup; Reapplied with proper JIRA number (spotted by Duo Zhang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/05ab41d1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/05ab41d1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/05ab41d1 Branch: refs/heads/hbase-12439 Commit: 05ab41d1bea53295d2c0790fba71c441ff85a6a5 Parents: 0583d79 Author: Michael Stack <st...@apache.org> Authored: Tue Jan 3 19:17:17 2017 -0800 Committer: Michael Stack <st...@apache.org> Committed: Tue Jan 3 19:17:17 2017 -0800 ---------------------------------------------------------------------- .../client/coprocessor/AggregationClient.java | 94 +++++++++++++++----- 1 file changed, 71 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/05ab41d1/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index cde7d41..d236342 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -49,7 +49,6 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; -import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; @@ -59,6 +58,8 @@ import org.apache.hadoop.hbase.util.Pair; import com.google.protobuf.ByteString; import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; /** * This client class is for invoking the aggregate functions deployed on the @@ -81,13 +82,60 @@ import com.google.protobuf.Message; * </ul> * <p>Call {@link #close()} when done. */ -@InterfaceAudience.Private +@InterfaceAudience.Public public class AggregationClient implements Closeable { // TODO: This class is not used. Move to examples? private static final Log log = LogFactory.getLog(AggregationClient.class); private final Connection connection; /** + * An RpcController implementation for use here in this endpoint. + */ + static class AggregationClientRpcController implements RpcController { + private String errorText; + private boolean cancelled = false; + private boolean failed = false; + + @Override + public String errorText() { + return this.errorText; + } + + @Override + public boolean failed() { + return this.failed; + } + + @Override + public boolean isCanceled() { + return this.cancelled; + } + + @Override + public void notifyOnCancel(RpcCallback<Object> arg0) { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + this.errorText = null; + this.cancelled = false; + this.failed = false; + } + + @Override + public void setFailed(String errorText) { + this.failed = true; + this.errorText = errorText; + } + + @Override + public void startCancel() { + this.cancelled = true; + } + } + + /** * Constructor with Conf object * @param cfg */ @@ -160,13 +208,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, R>() { @Override public R call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getMax(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); @@ -248,13 +296,13 @@ public class AggregationClient implements Closeable { @Override public R call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getMin(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } if (response.getFirstPartCount() > 0) { ByteString b = response.getFirstPart(0); @@ -323,13 +371,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, Long>() { @Override public Long call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getRowNum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); @@ -388,14 +436,14 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, S>() { @Override public S call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); // Not sure what is going on here why I have to do these casts. TODO. CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getSum(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } if (response.getFirstPartCount() == 0) { return null; @@ -456,13 +504,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, Pair<S, Long>>() { @Override public Pair<S, Long> call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getAvg(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } Pair<S, Long> pair = new Pair<S, Long>(null, 0L); if (response.getFirstPartCount() == 0) { @@ -560,13 +608,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, Pair<List<S>, Long>>() { @Override public Pair<List<S>, Long> call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getStd(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L); if (response.getFirstPartCount() == 0) { @@ -676,13 +724,13 @@ public class AggregationClient implements Closeable { new Batch.Call<AggregateService, List<S>>() { @Override public List<S> call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); + RpcController controller = new AggregationClientRpcController(); CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); instance.getMedian(controller, requestArg, rpcCallback); AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); + if (controller.failed()) { + throw new IOException(controller.errorText()); } List<S> list = new ArrayList<S>();