Github user geraldss commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/308#discussion_r203926206
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
    @@ -0,0 +1,173 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.iterate;
    +
    +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
    +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
    +
    +import java.io.IOException;
    +import java.sql.SQLException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    +import org.apache.phoenix.expression.Expression;
    +import org.apache.phoenix.expression.aggregator.Aggregator;
    +import org.apache.phoenix.expression.aggregator.Aggregators;
    +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
    +import org.apache.phoenix.schema.tuple.Tuple;
    +import org.apache.phoenix.util.KeyValueUtil;
    +import org.apache.phoenix.util.TupleUtil;
    +
    +/**
    + * 
    + * This class implements client-side hash aggregation in memory.
    + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
    + * 
    + */
    +public class ClientHashAggregatingResultIterator
    +    implements AggregatingResultIterator {
    +
    +    private static final int HASH_AGG_INIT_SIZE = 64*1024;
    +    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
    +    private final ResultIterator resultIterator;
    +    private final Aggregators aggregators;
    +    private final List<Expression> groupByExpressions;
    +    private final int thresholdBytes;
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
    +    private List<ImmutableBytesWritable> keyList;
    +    private Iterator<ImmutableBytesWritable> keyIterator;
    +
    +    public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List<Expression> groupByExpressions, 
int thresholdBytes) {
    +        Objects.requireNonNull(resultIterator);
    +        Objects.requireNonNull(aggregators);
    +        Objects.requireNonNull(groupByExpressions);
    +        this.resultIterator = resultIterator;
    +        this.aggregators = aggregators;
    +        this.groupByExpressions = groupByExpressions;
    +        this.thresholdBytes = thresholdBytes;
    +    }
    +
    +    @Override
    +        public Tuple next() throws SQLException {
    +        if (keyIterator == null) {
    +            hash = populateHash();
    +            keyList = sortKeys();
    +            keyIterator = keyList.iterator();
    +        }
    +
    +        if (!keyIterator.hasNext()) {
    +            return null;
    +        }
    +
    +        ImmutableBytesWritable key = keyIterator.next();
    +        Aggregator[] rowAggregators = hash.get(key);
    +        byte[] value = aggregators.toBytes(rowAggregators);
    +        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
    +        return tuple;
    +    }
    +
    +    @Override
    +        public void close() throws SQLException {
    +        keyIterator = null;
    +        keyList = null;
    +        hash = null;
    +        resultIterator.close();
    +    }
    +
    +    @Override
    +        public Aggregator[] aggregate(Tuple result) {
    +        Aggregator[] rowAggregators = aggregators.getAggregators();
    +        aggregators.reset(rowAggregators);
    +        aggregators.aggregate(rowAggregators, result);
    +        return rowAggregators;
    +    }
    +
    +    @Override
    +        public void explain(List<String> planSteps) {
    +        resultIterator.explain(planSteps);
    +    }
    +
    +    @Override
    +        public String toString() {
    +        return "ClientHashAggregatingResultIterator [resultIterator="
    +            + resultIterator + ", aggregators=" + aggregators + ", 
groupByExpressions="
    +            + groupByExpressions + "]";
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, 
ImmutableBytesWritable ptr) throws SQLException {
    +        try {
    +            ImmutableBytesWritable key = 
TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
    +            ptr.set(key.get(), key.getOffset(), key.getLength());
    +            return ptr;
    +        } catch (IOException e) {
    +            throw new SQLException(e);
    +        }
    +    }
    +
    +    // Copied from ClientGroupedAggregatingResultIterator
    +    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
    +        return new MultiKeyValueTuple(Collections.<Cell> 
singletonList(keyValue));
    +    }
    +
    +    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() 
throws SQLException {
    +        hash = new HashMap<ImmutableBytesWritable, 
Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
    +
    +        final int aggSize = aggregators.getEstimatedByteSize();
    +        int hashSize = 0;
    --- End diff --
    
    Done.


---

Reply via email to