Github user JamesRTaylor commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r203867850 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator + implements AggregatingResultIterator { + + private static final int HASH_AGG_INIT_SIZE = 64*1024; + private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; + private final ResultIterator resultIterator; + private final Aggregators aggregators; + private final List<Expression> groupByExpressions; + private final int thresholdBytes; + private HashMap<ImmutableBytesWritable, Aggregator[]> hash; + private List<ImmutableBytesWritable> keyList; + private Iterator<ImmutableBytesWritable> keyIterator; + + public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List<Expression> groupByExpressions, int thresholdBytes) { + Objects.requireNonNull(resultIterator); + Objects.requireNonNull(aggregators); + Objects.requireNonNull(groupByExpressions); + this.resultIterator = resultIterator; + this.aggregators = aggregators; + this.groupByExpressions = groupByExpressions; + this.thresholdBytes = thresholdBytes; + } + + @Override + public Tuple next() throws SQLException { + if (keyIterator == null) { + hash = populateHash(); + keyList = sortKeys(); + keyIterator = keyList.iterator(); + } + + if (!keyIterator.hasNext()) { + return null; + } + + ImmutableBytesWritable key = keyIterator.next(); + Aggregator[] rowAggregators = hash.get(key); + byte[] value = aggregators.toBytes(rowAggregators); + Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); + return tuple; + } + + @Override + public void close() throws SQLException { + keyIterator = null; + keyList = null; + hash = null; + resultIterator.close(); + } + + @Override + public Aggregator[] aggregate(Tuple result) { + Aggregator[] rowAggregators = aggregators.getAggregators(); + aggregators.reset(rowAggregators); + aggregators.aggregate(rowAggregators, result); + return rowAggregators; + } + + @Override + public void explain(List<String> planSteps) { + resultIterator.explain(planSteps); + } + + @Override + public String toString() { + return "ClientHashAggregatingResultIterator [resultIterator=" + + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions=" + + groupByExpressions + "]"; + } + + // Copied from ClientGroupedAggregatingResultIterator + protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException { + try { + ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions); + ptr.set(key.get(), key.getOffset(), key.getLength()); + return ptr; + } catch (IOException e) { + throw new SQLException(e); + } + } + + // Copied from ClientGroupedAggregatingResultIterator + protected Tuple wrapKeyValueAsResult(KeyValue keyValue) { + return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue)); + } + + private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException { + hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f); + + final int aggSize = aggregators.getEstimatedByteSize(); + int hashSize = 0; + + for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) { + ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER); + key = getGroupingKey(result, key); + Aggregator[] rowAggregators = hash.get(key); + if (rowAggregators == null) { + // Abort if we exceed memory threshold/2 + // We use threshold/2 to leave room for the subsequent sort + if (thresholdBytes > 0) { + hashSize += key.getSize() + aggSize; --- End diff -- Track memory used as you're doing (but use SizedUtil.sizeOfMap() to account for overhead of HashMap), but then call memoryChunk.resize(hashSize) when it gets bigger than another CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE. The reason to do it this way is that if there are lots of other threads doing memory intensive queries, you'll get a failure earlier (before a potential OOM exception can happen which is not recoverable from)
---