Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r203467413 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator + implements AggregatingResultIterator { + + private static final int HASH_AGG_INIT_SIZE = 64*1024; + private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; + private final ResultIterator resultIterator; + private final Aggregators aggregators; + private final List<Expression> groupByExpressions; + private HashMap<ImmutableBytesWritable, Aggregator[]> hash; + private List<ImmutableBytesWritable> keyList; + private Iterator<ImmutableBytesWritable> keyIterator; + + public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions) { + if (resultIterator == null) throw new NullPointerException(); + if (aggregators == null) throw new NullPointerException(); + if (groupByExpressions == null) throw new NullPointerException(); + this.resultIterator = resultIterator; + this.aggregators = aggregators; + this.groupByExpressions = groupByExpressions; + } + + @Override + public Tuple next() throws SQLException { + if (keyIterator == null) { + populateHash(); + sortKeys(); + keyIterator = keyList.iterator(); + } + + if (!keyIterator.hasNext()) { + return null; + } + + ImmutableBytesWritable key = keyIterator.next(); + Aggregator[] rowAggregators = hash.get(key); + byte[] value = aggregators.toBytes(rowAggregators); + Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); + return tuple; + } + + @Override + public void close() throws SQLException { + keyIterator = null; + keyList = null; + hash = null; + resultIterator.close(); + } + + @Override + public Aggregator[] aggregate(Tuple result) { + Aggregator[] rowAggregators = aggregators.getAggregators(); + aggregators.reset(rowAggregators); + aggregators.aggregate(rowAggregators, result); + return rowAggregators; + } + + @Override + public void explain(List<String> planSteps) { + resultIterator.explain(planSteps); + } + + @Override + public String toString() { + return "ClientHashAggregatingResultIterator [resultIterator=" + + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions=" + + groupByExpressions + "]"; + } + + // Copied from ClientGroupedAggregatingResultIterator + protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException { + try { + ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions); + ptr.set(key.get(), key.getOffset(), key.getLength()); + return ptr; + } catch (IOException e) { + throw new SQLException(e); + } + } + + // Copied from ClientGroupedAggregatingResultIterator + protected Tuple wrapKeyValueAsResult(KeyValue keyValue) { + return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue)); + } + + private void populateHash() throws SQLException { --- End diff -- @JamesRTaylor @solzy I have added a memory limitation based on SPOOL_THRESHOLD_BYTES. It throws an exception similar to HashCacheClient. Please review. Thanks.
---