http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java new file mode 100644 index 0000000..cde7d41 --- /dev/null +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -0,0 +1,864 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; + +/** + * This client class is for invoking the aggregate functions deployed on the + * Region Server side via the AggregateService. This class will implement the + * supporting functionality for summing/processing the individual results + * obtained from the AggregateService for each region. + * <p> + * This will serve as the client side handler for invoking the aggregate + * functions. + * For all aggregate functions, + * <ul> + * <li>start row < end row is an essential condition (if they are not + * {@link HConstants#EMPTY_BYTE_ARRAY}) + * <li>Column family can't be null. In case where multiple families are + * provided, an IOException will be thrown. An optional column qualifier can + * also be defined.</li> + * <li>For methods to find maximum, minimum, sum, rowcount, it returns the + * parameter type. For average and std, it returns a double value. For row + * count, it returns a long value.</li> + * </ul> + * <p>Call {@link #close()} when done. + */ +@InterfaceAudience.Private +public class AggregationClient implements Closeable { + // TODO: This class is not used. Move to examples? + private static final Log log = LogFactory.getLog(AggregationClient.class); + private final Connection connection; + + /** + * Constructor with Conf object + * @param cfg + */ + public AggregationClient(Configuration cfg) { + try { + // Create a connection on construction. Will use it making each of the calls below. + this.connection = ConnectionFactory.createConnection(cfg); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + if (this.connection != null && !this.connection.isClosed()) { + this.connection.close(); + } + } + + /** + * It gives the maximum value of a column for a given column family for the + * given range. In case qualifier is null, a max of all values for the given + * family is returned. + * @param tableName + * @param ci + * @param scan + * @return max val <R> + * @throws Throwable + * The caller is supposed to handle the exception as they are thrown + * & propagated to it. + */ + public <R, S, P extends Message, Q extends Message, T extends Message> R max( + final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) + throws Throwable { + try (Table table = connection.getTable(tableName)) { + return max(table, ci, scan); + } + } + + /** + * It gives the maximum value of a column for a given column family for the + * given range. In case qualifier is null, a max of all values for the given + * family is returned. + * @param table + * @param ci + * @param scan + * @return max val <> + * @throws Throwable + * The caller is supposed to handle the exception as they are thrown + * & propagated to it. + */ + public <R, S, P extends Message, Q extends Message, T extends Message> + R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, + final Scan scan) throws Throwable { + final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); + class MaxCallBack implements Batch.Callback<R> { + R max = null; + + R getMax() { + return max; + } + + @Override + public synchronized void update(byte[] region, byte[] row, R result) { + max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; + } + } + MaxCallBack aMaxCallBack = new MaxCallBack(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call<AggregateService, R>() { + @Override + public R call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); + instance.getMax(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + if (response.getFirstPartCount() > 0) { + ByteString b = response.getFirstPart(0); + Q q = getParsedGenericInstance(ci.getClass(), 3, b); + return ci.getCellValueFromProto(q); + } + return null; + } + }, aMaxCallBack); + return aMaxCallBack.getMax(); + } + + /* + * @param scan + * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan + */ + private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException { + if (scan == null + || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals( + scan.getStartRow(), HConstants.EMPTY_START_ROW)) + || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals( + scan.getStopRow(), HConstants.EMPTY_END_ROW))) { + throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow"); + } else if (!canFamilyBeAbsent) { + if (scan.getFamilyMap().size() != 1) { + throw new IOException("There must be only one family."); + } + } + } + + /** + * It gives the minimum value of a column for a given column family for the + * given range. In case qualifier is null, a min of all values for the given + * family is returned. + * @param tableName + * @param ci + * @param scan + * @return min val <R> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> R min( + final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) + throws Throwable { + try (Table table = connection.getTable(tableName)) { + return min(table, ci, scan); + } + } + + /** + * It gives the minimum value of a column for a given column family for the + * given range. In case qualifier is null, a min of all values for the given + * family is returned. + * @param table + * @param ci + * @param scan + * @return min val <R> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> + R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, + final Scan scan) throws Throwable { + final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); + class MinCallBack implements Batch.Callback<R> { + + private R min = null; + + public R getMinimum() { + return min; + } + + @Override + public synchronized void update(byte[] region, byte[] row, R result) { + min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; + } + } + MinCallBack minCallBack = new MinCallBack(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call<AggregateService, R>() { + + @Override + public R call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); + instance.getMin(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + if (response.getFirstPartCount() > 0) { + ByteString b = response.getFirstPart(0); + Q q = getParsedGenericInstance(ci.getClass(), 3, b); + return ci.getCellValueFromProto(q); + } + return null; + } + }, minCallBack); + log.debug("Min fom all regions is: " + minCallBack.getMinimum()); + return minCallBack.getMinimum(); + } + + /** + * It gives the row count, by summing up the individual results obtained from + * regions. In case the qualifier is null, FirstKeyValueFilter is used to + * optimised the operation. In case qualifier is provided, I can't use the + * filter as it may set the flag to skip to next row, but the value read is + * not of the given filter: in this case, this particular row will not be + * counted ==> an error. + * @param tableName + * @param ci + * @param scan + * @return <R, S> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount( + final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) + throws Throwable { + try (Table table = connection.getTable(tableName)) { + return rowCount(table, ci, scan); + } + } + + /** + * It gives the row count, by summing up the individual results obtained from + * regions. In case the qualifier is null, FirstKeyValueFilter is used to + * optimised the operation. In case qualifier is provided, I can't use the + * filter as it may set the flag to skip to next row, but the value read is + * not of the given filter: in this case, this particular row will not be + * counted ==> an error. + * @param table + * @param ci + * @param scan + * @return <R, S> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> + long rowCount(final Table table, + final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { + final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); + class RowNumCallback implements Batch.Callback<Long> { + private final AtomicLong rowCountL = new AtomicLong(0); + + public long getRowNumCount() { + return rowCountL.get(); + } + + @Override + public void update(byte[] region, byte[] row, Long result) { + rowCountL.addAndGet(result.longValue()); + } + } + RowNumCallback rowNum = new RowNumCallback(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call<AggregateService, Long>() { + @Override + public Long call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); + instance.getRowNum(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); + ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); + bb.rewind(); + return bb.getLong(); + } + }, rowNum); + return rowNum.getRowNumCount(); + } + + /** + * It sums up the value returned from various regions. In case qualifier is + * null, summation of all the column qualifiers in the given family is done. + * @param tableName + * @param ci + * @param scan + * @return sum <S> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> S sum( + final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) + throws Throwable { + try (Table table = connection.getTable(tableName)) { + return sum(table, ci, scan); + } + } + + /** + * It sums up the value returned from various regions. In case qualifier is + * null, summation of all the column qualifiers in the given family is done. + * @param table + * @param ci + * @param scan + * @return sum <S> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> + S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, + final Scan scan) throws Throwable { + final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); + + class SumCallBack implements Batch.Callback<S> { + S sumVal = null; + + public S getSumResult() { + return sumVal; + } + + @Override + public synchronized void update(byte[] region, byte[] row, S result) { + sumVal = ci.add(sumVal, result); + } + } + SumCallBack sumCallBack = new SumCallBack(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call<AggregateService, S>() { + @Override + public S call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + // Not sure what is going on here why I have to do these casts. TODO. + CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); + instance.getSum(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + if (response.getFirstPartCount() == 0) { + return null; + } + ByteString b = response.getFirstPart(0); + T t = getParsedGenericInstance(ci.getClass(), 4, b); + S s = ci.getPromotedValueFromProto(t); + return s; + } + }, sumCallBack); + return sumCallBack.getSumResult(); + } + + /** + * It computes average while fetching sum and row count from all the + * corresponding regions. Approach is to compute a global sum of region level + * sum and rowcount and then compute the average. + * @param tableName + * @param scan + * @throws Throwable + */ + private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs( + final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) + throws Throwable { + try (Table table = connection.getTable(tableName)) { + return getAvgArgs(table, ci, scan); + } + } + + /** + * It computes average while fetching sum and row count from all the + * corresponding regions. Approach is to compute a global sum of region level + * sum and rowcount and then compute the average. + * @param table + * @param scan + * @throws Throwable + */ + private <R, S, P extends Message, Q extends Message, T extends Message> + Pair<S, Long> getAvgArgs(final Table table, + final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { + final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); + class AvgCallBack implements Batch.Callback<Pair<S, Long>> { + S sum = null; + Long rowCount = 0l; + + public synchronized Pair<S, Long> getAvgArgs() { + return new Pair<S, Long>(sum, rowCount); + } + + @Override + public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) { + sum = ci.add(sum, result.getFirst()); + rowCount += result.getSecond(); + } + } + AvgCallBack avgCallBack = new AvgCallBack(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call<AggregateService, Pair<S, Long>>() { + @Override + public Pair<S, Long> call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); + instance.getAvg(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + Pair<S, Long> pair = new Pair<S, Long>(null, 0L); + if (response.getFirstPartCount() == 0) { + return pair; + } + ByteString b = response.getFirstPart(0); + T t = getParsedGenericInstance(ci.getClass(), 4, b); + S s = ci.getPromotedValueFromProto(t); + pair.setFirst(s); + ByteBuffer bb = ByteBuffer.allocate(8).put( + getBytesFromResponse(response.getSecondPart())); + bb.rewind(); + pair.setSecond(bb.getLong()); + return pair; + } + }, avgCallBack); + return avgCallBack.getAvgArgs(); + } + + /** + * This is the client side interface/handle for calling the average method for + * a given cf-cq combination. It was necessary to add one more call stack as + * its return type should be a decimal value, irrespective of what + * columninterpreter says. So, this methods collects the necessary parameters + * to compute the average and returs the double value. + * @param tableName + * @param ci + * @param scan + * @return <R, S> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> + double avg(final TableName tableName, + final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { + Pair<S, Long> p = getAvgArgs(tableName, ci, scan); + return ci.divideForAvg(p.getFirst(), p.getSecond()); + } + + /** + * This is the client side interface/handle for calling the average method for + * a given cf-cq combination. It was necessary to add one more call stack as + * its return type should be a decimal value, irrespective of what + * columninterpreter says. So, this methods collects the necessary parameters + * to compute the average and returs the double value. + * @param table + * @param ci + * @param scan + * @return <R, S> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> double avg( + final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { + Pair<S, Long> p = getAvgArgs(table, ci, scan); + return ci.divideForAvg(p.getFirst(), p.getSecond()); + } + + /** + * It computes a global standard deviation for a given column and its value. + * Standard deviation is square root of (average of squares - + * average*average). From individual regions, it obtains sum, square sum and + * number of rows. With these, the above values are computed to get the global + * std. + * @param table + * @param scan + * @return standard deviations + * @throws Throwable + */ + private <R, S, P extends Message, Q extends Message, T extends Message> + Pair<List<S>, Long> getStdArgs(final Table table, + final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { + final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); + class StdCallback implements Batch.Callback<Pair<List<S>, Long>> { + long rowCountVal = 0l; + S sumVal = null, sumSqVal = null; + + public synchronized Pair<List<S>, Long> getStdParams() { + List<S> l = new ArrayList<S>(); + l.add(sumVal); + l.add(sumSqVal); + Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal); + return p; + } + + @Override + public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) { + if (result.getFirst().size() > 0) { + sumVal = ci.add(sumVal, result.getFirst().get(0)); + sumSqVal = ci.add(sumSqVal, result.getFirst().get(1)); + rowCountVal += result.getSecond(); + } + } + } + StdCallback stdCallback = new StdCallback(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call<AggregateService, Pair<List<S>, Long>>() { + @Override + public Pair<List<S>, Long> call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); + instance.getStd(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L); + if (response.getFirstPartCount() == 0) { + return pair; + } + List<S> list = new ArrayList<S>(); + for (int i = 0; i < response.getFirstPartCount(); i++) { + ByteString b = response.getFirstPart(i); + T t = getParsedGenericInstance(ci.getClass(), 4, b); + S s = ci.getPromotedValueFromProto(t); + list.add(s); + } + pair.setFirst(list); + ByteBuffer bb = ByteBuffer.allocate(8).put( + getBytesFromResponse(response.getSecondPart())); + bb.rewind(); + pair.setSecond(bb.getLong()); + return pair; + } + }, stdCallback); + return stdCallback.getStdParams(); + } + + /** + * This is the client side interface/handle for calling the std method for a + * given cf-cq combination. It was necessary to add one more call stack as its + * return type should be a decimal value, irrespective of what + * columninterpreter says. So, this methods collects the necessary parameters + * to compute the std and returns the double value. + * @param tableName + * @param ci + * @param scan + * @return <R, S> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> + double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, + Scan scan) throws Throwable { + try (Table table = connection.getTable(tableName)) { + return std(table, ci, scan); + } + } + + /** + * This is the client side interface/handle for calling the std method for a + * given cf-cq combination. It was necessary to add one more call stack as its + * return type should be a decimal value, irrespective of what + * columninterpreter says. So, this methods collects the necessary parameters + * to compute the std and returns the double value. + * @param table + * @param ci + * @param scan + * @return <R, S> + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> double std( + final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { + Pair<List<S>, Long> p = getStdArgs(table, ci, scan); + double res = 0d; + double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond()); + double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond()); + res = avgOfSumSq - (avg) * (avg); // variance + res = Math.pow(res, 0.5); + return res; + } + + /** + * It helps locate the region with median for a given column whose weight + * is specified in an optional column. + * From individual regions, it obtains sum of values and sum of weights. + * @param table + * @param ci + * @param scan + * @return pair whose first element is a map between start row of the region + * and (sum of values, sum of weights) for the region, the second element is + * (sum of values, sum of weights) for all the regions chosen + * @throws Throwable + */ + private <R, S, P extends Message, Q extends Message, T extends Message> + Pair<NavigableMap<byte[], List<S>>, List<S>> + getMedianArgs(final Table table, + final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { + final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); + final NavigableMap<byte[], List<S>> map = + new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR); + class StdCallback implements Batch.Callback<List<S>> { + S sumVal = null, sumWeights = null; + + public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() { + List<S> l = new ArrayList<S>(); + l.add(sumVal); + l.add(sumWeights); + Pair<NavigableMap<byte[], List<S>>, List<S>> p = + new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l); + return p; + } + + @Override + public synchronized void update(byte[] region, byte[] row, List<S> result) { + map.put(row, result); + sumVal = ci.add(sumVal, result.get(0)); + sumWeights = ci.add(sumWeights, result.get(1)); + } + } + StdCallback stdCallback = new StdCallback(); + table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call<AggregateService, List<S>>() { + @Override + public List<S> call(AggregateService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse> rpcCallback = + new CoprocessorRpcUtils.BlockingRpcCallback<AggregateResponse>(); + instance.getMedian(controller, requestArg, rpcCallback); + AggregateResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + + List<S> list = new ArrayList<S>(); + for (int i = 0; i < response.getFirstPartCount(); i++) { + ByteString b = response.getFirstPart(i); + T t = getParsedGenericInstance(ci.getClass(), 4, b); + S s = ci.getPromotedValueFromProto(t); + list.add(s); + } + return list; + } + + }, stdCallback); + return stdCallback.getMedianParams(); + } + + /** + * This is the client side interface/handler for calling the median method for a + * given cf-cq combination. This method collects the necessary parameters + * to compute the median and returns the median. + * @param tableName + * @param ci + * @param scan + * @return R the median + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> + R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, + Scan scan) throws Throwable { + try (Table table = connection.getTable(tableName)) { + return median(table, ci, scan); + } + } + + /** + * This is the client side interface/handler for calling the median method for a + * given cf-cq combination. This method collects the necessary parameters + * to compute the median and returns the median. + * @param table + * @param ci + * @param scan + * @return R the median + * @throws Throwable + */ + public <R, S, P extends Message, Q extends Message, T extends Message> + R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, + Scan scan) throws Throwable { + Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan); + byte[] startRow = null; + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily); + NavigableMap<byte[], List<S>> map = p.getFirst(); + S sumVal = p.getSecond().get(0); + S sumWeights = p.getSecond().get(1); + double halfSumVal = ci.divideForAvg(sumVal, 2L); + double movingSumVal = 0; + boolean weighted = false; + if (quals.size() > 1) { + weighted = true; + halfSumVal = ci.divideForAvg(sumWeights, 2L); + } + + for (Map.Entry<byte[], List<S>> entry : map.entrySet()) { + S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0); + double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); + if (newSumVal > halfSumVal) break; // we found the region with the median + movingSumVal = newSumVal; + startRow = entry.getKey(); + } + // scan the region with median and find it + Scan scan2 = new Scan(scan); + // inherit stop row from method parameter + if (startRow != null) scan2.setStartRow(startRow); + ResultScanner scanner = null; + try { + int cacheSize = scan2.getCaching(); + if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { + scan2.setCacheBlocks(true); + cacheSize = 5; + scan2.setCaching(cacheSize); + } + scanner = table.getScanner(scan2); + Result[] results = null; + byte[] qualifier = quals.pollFirst(); + // qualifier for the weight column + byte[] weightQualifier = weighted ? quals.pollLast() : qualifier; + R value = null; + do { + results = scanner.next(cacheSize); + if (results != null && results.length > 0) { + for (int i = 0; i < results.length; i++) { + Result r = results[i]; + // retrieve weight + Cell kv = r.getColumnLatestCell(colFamily, weightQualifier); + R newValue = ci.getValue(colFamily, weightQualifier, kv); + S s = ci.castToReturnType(newValue); + double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); + // see if we have moved past the median + if (newSumVal > halfSumVal) { + return value; + } + movingSumVal = newSumVal; + kv = r.getColumnLatestCell(colFamily, qualifier); + value = ci.getValue(colFamily, qualifier, kv); + } + } + } while (results != null && results.length > 0); + } finally { + if (scanner != null) { + scanner.close(); + } + } + return null; + } + + <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest + validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent) + throws IOException { + validateParameters(scan, canFamilyBeAbsent); + final AggregateRequest.Builder requestBuilder = + AggregateRequest.newBuilder(); + requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); + P columnInterpreterSpecificData = null; + if ((columnInterpreterSpecificData = ci.getRequestData()) + != null) { + requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); + } + requestBuilder.setScan(ProtobufUtil.toScan(scan)); + return requestBuilder.build(); + } + + byte[] getBytesFromResponse(ByteString response) { + ByteBuffer bb = response.asReadOnlyByteBuffer(); + bb.rewind(); + byte[] bytes; + if (bb.hasArray()) { + bytes = bb.array(); + } else { + bytes = response.toByteArray(); + } + return bytes; + } + + /** + * Get an instance of the argument type declared in a class's signature. The + * argument type is assumed to be a PB Message subclass, and the instance is + * created using parseFrom method on the passed ByteString. + * @param runtimeClass the runtime type of the class + * @param position the position of the argument in the class declaration + * @param b the ByteString which should be parsed to get the instance created + * @return the instance + * @throws IOException + */ + @SuppressWarnings("unchecked") + // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO. + public static <T extends Message> + T getParsedGenericInstance(Class<?> runtimeClass, int position, ByteString b) + throws IOException { + Type type = runtimeClass.getGenericSuperclass(); + Type argType = ((ParameterizedType)type).getActualTypeArguments()[position]; + Class<T> classType = (Class<T>)argType; + T inst; + try { + Method m = classType.getMethod("parseFrom", ByteString.class); + inst = (T)m.invoke(null, b); + return inst; + } catch (SecurityException e) { + throw new IOException(e); + } catch (NoSuchMethodException e) { + throw new IOException(e); + } catch (IllegalArgumentException e) { + throw new IOException(e); + } catch (InvocationTargetException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java new file mode 100644 index 0000000..08b0562 --- /dev/null +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java @@ -0,0 +1,530 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.AggregationClient; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; +import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; +import org.apache.hadoop.hbase.regionserver.InternalScanner; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; + +/** + * A concrete AggregateProtocol implementation. Its system level coprocessor + * that computes the aggregate function at a region level. + * {@link ColumnInterpreter} is used to interpret column value. This class is + * parameterized with the following (these are the types with which the {@link ColumnInterpreter} + * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}): + * @param T Cell value data type + * @param S Promoted data type + * @param P PB message that is used to transport initializer specific bytes + * @param Q PB message that is used to transport Cell (<T>) instance + * @param R PB message that is used to transport Promoted (<S>) instance + */ +@InterfaceAudience.Private +public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message> +extends AggregateService implements CoprocessorService, Coprocessor { + protected static final Log log = LogFactory.getLog(AggregateImplementation.class); + private RegionCoprocessorEnvironment env; + + /** + * Gives the maximum for a given combination of column qualifier and column + * family, in the given row range as defined in the Scan object. In its + * current implementation, it takes one column family and one column qualifier + * (if provided). In case of null column qualifier, maximum value for the + * entire column family will be returned. + */ + @Override + public void getMax(RpcController controller, AggregateRequest request, + RpcCallback<AggregateResponse> done) { + InternalScanner scanner = null; + AggregateResponse response = null; + T max = null; + try { + ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); + T temp; + Scan scan = ProtobufUtil.toScan(request.getScan()); + scanner = env.getRegion().getScanner(scan); + List<Cell> results = new ArrayList<Cell>(); + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); + byte[] qualifier = null; + if (qualifiers != null && !qualifiers.isEmpty()) { + qualifier = qualifiers.pollFirst(); + } + // qualifier can be null. + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + int listSize = results.size(); + for (int i = 0; i < listSize; i++) { + temp = ci.getValue(colFamily, qualifier, results.get(i)); + max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max; + } + results.clear(); + } while (hasMoreRows); + if (max != null) { + AggregateResponse.Builder builder = AggregateResponse.newBuilder(); + builder.addFirstPart(ci.getProtoForCellType(max).toByteString()); + response = builder.build(); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) {} + } + } + log.info("Maximum from this region is " + + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + max); + done.run(response); + } + + /** + * Gives the minimum for a given combination of column qualifier and column + * family, in the given row range as defined in the Scan object. In its + * current implementation, it takes one column family and one column qualifier + * (if provided). In case of null column qualifier, minimum value for the + * entire column family will be returned. + */ + @Override + public void getMin(RpcController controller, AggregateRequest request, + RpcCallback<AggregateResponse> done) { + AggregateResponse response = null; + InternalScanner scanner = null; + T min = null; + try { + ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); + T temp; + Scan scan = ProtobufUtil.toScan(request.getScan()); + scanner = env.getRegion().getScanner(scan); + List<Cell> results = new ArrayList<Cell>(); + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); + byte[] qualifier = null; + if (qualifiers != null && !qualifiers.isEmpty()) { + qualifier = qualifiers.pollFirst(); + } + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + int listSize = results.size(); + for (int i = 0; i < listSize; i++) { + temp = ci.getValue(colFamily, qualifier, results.get(i)); + min = (min == null || (temp != null && ci.compare(temp, min) < 0)) ? temp : min; + } + results.clear(); + } while (hasMoreRows); + if (min != null) { + response = AggregateResponse.newBuilder().addFirstPart( + ci.getProtoForCellType(min).toByteString()).build(); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) {} + } + } + log.info("Minimum from this region is " + + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + min); + done.run(response); + } + + /** + * Gives the sum for a given combination of column qualifier and column + * family, in the given row range as defined in the Scan object. In its + * current implementation, it takes one column family and one column qualifier + * (if provided). In case of null column qualifier, sum for the entire column + * family will be returned. + */ + @Override + public void getSum(RpcController controller, AggregateRequest request, + RpcCallback<AggregateResponse> done) { + AggregateResponse response = null; + InternalScanner scanner = null; + long sum = 0l; + try { + ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); + S sumVal = null; + T temp; + Scan scan = ProtobufUtil.toScan(request.getScan()); + scanner = env.getRegion().getScanner(scan); + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); + byte[] qualifier = null; + if (qualifiers != null && !qualifiers.isEmpty()) { + qualifier = qualifiers.pollFirst(); + } + List<Cell> results = new ArrayList<Cell>(); + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + int listSize = results.size(); + for (int i = 0; i < listSize; i++) { + temp = ci.getValue(colFamily, qualifier, results.get(i)); + if (temp != null) + sumVal = ci.add(sumVal, ci.castToReturnType(temp)); + } + results.clear(); + } while (hasMoreRows); + if (sumVal != null) { + response = AggregateResponse.newBuilder().addFirstPart( + ci.getProtoForPromotedType(sumVal).toByteString()).build(); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) {} + } + } + log.debug("Sum from this region is " + + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + sum); + done.run(response); + } + + /** + * Gives the row count for the given column family and column qualifier, in + * the given row range as defined in the Scan object. + */ + @Override + public void getRowNum(RpcController controller, AggregateRequest request, + RpcCallback<AggregateResponse> done) { + AggregateResponse response = null; + long counter = 0l; + List<Cell> results = new ArrayList<Cell>(); + InternalScanner scanner = null; + try { + Scan scan = ProtobufUtil.toScan(request.getScan()); + byte[][] colFamilies = scan.getFamilies(); + byte[] colFamily = colFamilies != null ? colFamilies[0] : null; + NavigableSet<byte[]> qualifiers = colFamilies != null ? + scan.getFamilyMap().get(colFamily) : null; + byte[] qualifier = null; + if (qualifiers != null && !qualifiers.isEmpty()) { + qualifier = qualifiers.pollFirst(); + } + if (scan.getFilter() == null && qualifier == null) + scan.setFilter(new FirstKeyOnlyFilter()); + scanner = env.getRegion().getScanner(scan); + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + if (results.size() > 0) { + counter++; + } + results.clear(); + } while (hasMoreRows); + ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter); + bb.rewind(); + response = AggregateResponse.newBuilder().addFirstPart( + ByteString.copyFrom(bb)).build(); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) {} + } + } + log.info("Row counter from this region is " + + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter); + done.run(response); + } + + /** + * Gives a Pair with first object as Sum and second object as row count, + * computed for a given combination of column qualifier and column family in + * the given row range as defined in the Scan object. In its current + * implementation, it takes one column family and one column qualifier (if + * provided). In case of null column qualifier, an aggregate sum over all the + * entire column family will be returned. + * <p> + * The average is computed in + * AggregationClient#avg(byte[], ColumnInterpreter, Scan) by + * processing results from all regions, so its "ok" to pass sum and a Long + * type. + */ + @Override + public void getAvg(RpcController controller, AggregateRequest request, + RpcCallback<AggregateResponse> done) { + AggregateResponse response = null; + InternalScanner scanner = null; + try { + ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); + S sumVal = null; + Long rowCountVal = 0l; + Scan scan = ProtobufUtil.toScan(request.getScan()); + scanner = env.getRegion().getScanner(scan); + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); + byte[] qualifier = null; + if (qualifiers != null && !qualifiers.isEmpty()) { + qualifier = qualifiers.pollFirst(); + } + List<Cell> results = new ArrayList<Cell>(); + boolean hasMoreRows = false; + + do { + results.clear(); + hasMoreRows = scanner.next(results); + int listSize = results.size(); + for (int i = 0; i < listSize; i++) { + sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily, + qualifier, results.get(i)))); + } + rowCountVal++; + } while (hasMoreRows); + if (sumVal != null) { + ByteString first = ci.getProtoForPromotedType(sumVal).toByteString(); + AggregateResponse.Builder pair = AggregateResponse.newBuilder(); + pair.addFirstPart(first); + ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal); + bb.rewind(); + pair.setSecondPart(ByteString.copyFrom(bb)); + response = pair.build(); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) {} + } + } + done.run(response); + } + + /** + * Gives a Pair with first object a List containing Sum and sum of squares, + * and the second object as row count. It is computed for a given combination of + * column qualifier and column family in the given row range as defined in the + * Scan object. In its current implementation, it takes one column family and + * one column qualifier (if provided). The idea is get the value of variance first: + * the average of the squares less the square of the average a standard + * deviation is square root of variance. + */ + @Override + public void getStd(RpcController controller, AggregateRequest request, + RpcCallback<AggregateResponse> done) { + InternalScanner scanner = null; + AggregateResponse response = null; + try { + ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); + S sumVal = null, sumSqVal = null, tempVal = null; + long rowCountVal = 0l; + Scan scan = ProtobufUtil.toScan(request.getScan()); + scanner = env.getRegion().getScanner(scan); + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); + byte[] qualifier = null; + if (qualifiers != null && !qualifiers.isEmpty()) { + qualifier = qualifiers.pollFirst(); + } + List<Cell> results = new ArrayList<Cell>(); + + boolean hasMoreRows = false; + + do { + tempVal = null; + hasMoreRows = scanner.next(results); + int listSize = results.size(); + for (int i = 0; i < listSize; i++) { + tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, + qualifier, results.get(i)))); + } + results.clear(); + sumVal = ci.add(sumVal, tempVal); + sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal)); + rowCountVal++; + } while (hasMoreRows); + if (sumVal != null) { + ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString(); + ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString(); + AggregateResponse.Builder pair = AggregateResponse.newBuilder(); + pair.addFirstPart(first_sumVal); + pair.addFirstPart(first_sumSqVal); + ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal); + bb.rewind(); + pair.setSecondPart(ByteString.copyFrom(bb)); + response = pair.build(); + } + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) {} + } + } + done.run(response); + } + + /** + * Gives a List containing sum of values and sum of weights. + * It is computed for the combination of column + * family and column qualifier(s) in the given row range as defined in the + * Scan object. In its current implementation, it takes one column family and + * two column qualifiers. The first qualifier is for values column and + * the second qualifier (optional) is for weight column. + */ + @Override + public void getMedian(RpcController controller, AggregateRequest request, + RpcCallback<AggregateResponse> done) { + AggregateResponse response = null; + InternalScanner scanner = null; + try { + ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request); + S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null; + Scan scan = ProtobufUtil.toScan(request.getScan()); + scanner = env.getRegion().getScanner(scan); + byte[] colFamily = scan.getFamilies()[0]; + NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily); + byte[] valQualifier = null, weightQualifier = null; + if (qualifiers != null && !qualifiers.isEmpty()) { + valQualifier = qualifiers.pollFirst(); + // if weighted median is requested, get qualifier for the weight column + weightQualifier = qualifiers.pollLast(); + } + List<Cell> results = new ArrayList<Cell>(); + + boolean hasMoreRows = false; + + do { + tempVal = null; + tempWeight = null; + hasMoreRows = scanner.next(results); + int listSize = results.size(); + for (int i = 0; i < listSize; i++) { + Cell kv = results.get(i); + tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, + valQualifier, kv))); + if (weightQualifier != null) { + tempWeight = ci.add(tempWeight, + ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv))); + } + } + results.clear(); + sumVal = ci.add(sumVal, tempVal); + sumWeights = ci.add(sumWeights, tempWeight); + } while (hasMoreRows); + ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString(); + S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights; + ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString(); + AggregateResponse.Builder pair = AggregateResponse.newBuilder(); + pair.addFirstPart(first_sumVal); + pair.addFirstPart(first_sumWeights); + response = pair.build(); + } catch (IOException e) { + CoprocessorRpcUtils.setControllerException(controller, e); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) {} + } + } + done.run(response); + } + + @SuppressWarnings("unchecked") + // Used server-side too by Aggregation Coprocesor Endpoint. Undo this interdependence. TODO. + ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest( + AggregateRequest request) throws IOException { + String className = request.getInterpreterClassName(); + Class<?> cls; + try { + cls = Class.forName(className); + ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance(); + if (request.hasInterpreterSpecificBytes()) { + ByteString b = request.getInterpreterSpecificBytes(); + P initMsg = AggregationClient.getParsedGenericInstance(ci.getClass(), 2, b); + ci.initialize(initMsg); + } + return ci; + } catch (ClassNotFoundException e) { + throw new IOException(e); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } + + @Override + public Service getService() { + return this; + } + + /** + * Stores a reference to the coprocessor environment provided by the + * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this + * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded + * on a table region, so always expects this to be an instance of + * {@link RegionCoprocessorEnvironment}. + * @param env the environment provided by the coprocessor host + * @throws IOException if the provided environment is not an instance of + * {@code RegionCoprocessorEnvironment} + */ + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment)env; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // nothing to do + } + +}