HBASE-17936 Refine sum endpoint example in ref guide Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d15f75b3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d15f75b3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d15f75b3 Branch: refs/heads/hbase-12439 Commit: d15f75b3cfc5de4def04e94cbb965fd7f578dc34 Parents: 75d1e03 Author: Xiang Li <wate...@gmail.com> Authored: Tue Apr 18 20:25:37 2017 +0800 Committer: Michael Stack <st...@apache.org> Committed: Tue Apr 18 09:33:09 2017 -0700 ---------------------------------------------------------------------- src/main/asciidoc/_chapters/cp.adoc | 60 ++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d15f75b3/src/main/asciidoc/_chapters/cp.adoc ---------------------------------------------------------------------- diff --git a/src/main/asciidoc/_chapters/cp.adoc b/src/main/asciidoc/_chapters/cp.adoc index d3fcd47..d0dcfef 100644 --- a/src/main/asciidoc/_chapters/cp.adoc +++ b/src/main/asciidoc/_chapters/cp.adoc @@ -610,7 +610,7 @@ The effect is that the duplicate coprocessor is effectively ignored. + [source, java] ---- -public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService { +public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService { private RegionCoprocessorEnvironment env; @@ -630,31 +630,33 @@ public class SumEndPoint extends SumService implements Coprocessor, CoprocessorS @Override public void stop(CoprocessorEnvironment env) throws IOException { - // do mothing + // do nothing } @Override - public void getSum(RpcController controller, SumRequest request, RpcCallback done) { + public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) { Scan scan = new Scan(); scan.addFamily(Bytes.toBytes(request.getFamily())); scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn())); - SumResponse response = null; + + Sum.SumResponse response = null; InternalScanner scanner = null; + try { scanner = env.getRegion().getScanner(scan); - List results = new ArrayList(); + List<Cell> results = new ArrayList<>(); boolean hasMore = false; - long sum = 0L; - do { - hasMore = scanner.next(results); - for (Cell cell : results) { - sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); - } - results.clear(); - } while (hasMore); + long sum = 0L; - response = SumResponse.newBuilder().setSum(sum).build(); + do { + hasMore = scanner.next(results); + for (Cell cell : results) { + sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); + } + results.clear(); + } while (hasMore); + response = Sum.SumResponse.newBuilder().setSum(sum).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { @@ -664,6 +666,7 @@ public class SumEndPoint extends SumService implements Coprocessor, CoprocessorS } catch (IOException ignored) {} } } + done.run(response); } } @@ -681,24 +684,29 @@ Table table = connection.getTable(tableName); //HConnection connection = HConnectionManager.createConnection(conf); //HTableInterface table = connection.getTable("users"); -final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross") - .build(); +final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build(); try { -Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null, -new Batch.Call<SumService, Long>() { - @Override - public Long call(SumService aggregate) throws IOException { -BlockingRpcCallback rpcCallback = new BlockingRpcCallback(); - aggregate.getSum(null, request, rpcCallback); - SumResponse response = rpcCallback.get(); - return response.hasSum() ? response.getSum() : 0L; + Map<byte[], Long> results = table.coprocessorService( + Sum.SumService.class, + null, /* start key */ + null, /* end key */ + new Batch.Call<Sum.SumService, Long>() { + @Override + public Long call(Sum.SumService aggregate) throws IOException { + BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>(); + aggregate.getSum(null, request, rpcCallback); + Sum.SumResponse response = rpcCallback.get(); + + return response.hasSum() ? response.getSum() : 0L; + } } - }); + ); + for (Long sum : results.values()) { System.out.println("Sum = " + sum); } } catch (ServiceException e) { -e.printStackTrace(); + e.printStackTrace(); } catch (Throwable e) { e.printStackTrace(); }