Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 fdf0718b4 -> 2541634da


PHOENIX-4751 Implement client-side hash aggregation


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2541634d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2541634d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2541634d

Branch: refs/heads/4.x-HBase-1.2
Commit: 2541634daeca9be27d08dfc1a9e346716e18bab7
Parents: fdf0718
Author: Gerald Sangudi <gsang...@23andme.com>
Authored: Thu Jun 14 12:49:30 2018 -0700
Committer: Thomas D'Silva <tdsi...@apache.org>
Committed: Thu Aug 2 20:28:51 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/ClientHashAggregateIT.java  | 208 ++++++++++++++++++
 .../phoenix/execute/ClientAggregatePlan.java    |  40 +++-
 .../ClientHashAggregatingResultIterator.java    | 210 +++++++++++++++++++
 .../java/org/apache/phoenix/parse/HintNode.java |   5 +
 4 files changed, 453 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2541634d/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java
new file mode 100644
index 0000000..bdc638b
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class ClientHashAggregateIT extends ParallelStatsDisabledIT {
+    
+    @Test
+    public void testSalted() throws Exception { 
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+   
+        try {
+            String table = createSalted(conn);
+            testTable(conn, table);
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testUnsalted() throws Exception { 
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+   
+        try {
+            String table = createUnsalted(conn);
+            testTable(conn, table);
+        } finally {
+            conn.close();
+        }
+    }
+
+    private void testTable(Connection conn, String table) throws Exception {
+        verifyExplain(conn, table, false, false);
+        verifyExplain(conn, table, false, true);
+        verifyExplain(conn, table, true, false);
+        verifyExplain(conn, table, true, true);
+
+        verifyResults(conn, table, 13, 0, false, false);
+        verifyResults(conn, table, 13, 0, false, true);
+        verifyResults(conn, table, 13, 0, true, false);
+        verifyResults(conn, table, 13, 0, true, true);
+
+        verifyResults(conn, table, 13, 17, false, true);
+        verifyResults(conn, table, 13, 17, true, true);
+
+        dropTable(conn, table);
+    }
+
+    private String createSalted(Connection conn) throws Exception {
+    
+        String table = "SALTED_" + generateUniqueName();
+        String create = "CREATE TABLE " + table + " ("
+            + " keyA BIGINT NOT NULL,"
+            + " keyB BIGINT NOT NULL,"
+            + " val SMALLINT,"
+            + " CONSTRAINT pk PRIMARY KEY (keyA, keyB)"
+            + ") SALT_BUCKETS = 4";
+
+        conn.createStatement().execute(create);
+        return table;
+    }
+
+    private String createUnsalted(Connection conn) throws Exception {
+    
+        String table = "UNSALTED_" + generateUniqueName();
+        String create = "CREATE TABLE " + table + " ("
+            + " keyA BIGINT NOT NULL,"
+            + " keyB BIGINT NOT NULL,"
+            + " val SMALLINT,"
+            + " CONSTRAINT pk PRIMARY KEY (keyA, keyB)"
+            + ")";
+
+        conn.createStatement().execute(create);
+        return table;
+    }
+
+    private String getQuery(String table, boolean hash, boolean swap, boolean 
sort) {
+
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN"
+            + (hash ? " HASH_AGGREGATE" : "") + " */"
+            + " t1.val v1, t2.val v2, COUNT(*) c"
+            + " FROM " + table + " t1 JOIN " + table + " t2"
+            + " ON (t1.keyB = t2.keyB)"
+            + " WHERE t1.keyA = 10 AND t2.keyA = 20"
+            + " GROUP BY "
+            + (swap ? "t2.val, t1.val" : "t1.val, t2.val")
+            + (sort ? " ORDER BY t1.val, t2.val" : "")
+            ;
+
+        return query;
+    }
+
+    private void verifyExplain(Connection conn, String table, boolean swap, 
boolean sort) throws Exception {
+
+        String query = "EXPLAIN " + getQuery(table, true, swap, sort);
+        Statement stmt = conn.createStatement();
+        ResultSet rs = stmt.executeQuery(query);
+        String plan = QueryUtil.getExplainPlan(rs);
+        rs.close();
+        assertTrue(plan != null && plan.contains("CLIENT HASH AGGREGATE"));
+        assertTrue(plan != null && (sort == plan.contains("CLIENT SORTED 
BY")));
+    }
+
+    private void verifyResults(Connection conn, String table, int c1, int c2, 
boolean swap, boolean sort) throws Exception {
+
+        String upsert = "UPSERT INTO " + table + "(keyA, keyB, val) VALUES(?, 
?, ?)";
+        PreparedStatement upsertStmt = conn.prepareStatement(upsert);
+        for (int i = 0; i < c1; i++) {
+            upsertStmt.setInt(1, 10);
+            upsertStmt.setInt(2, 100+i);
+            upsertStmt.setInt(3, 1);
+            upsertStmt.execute();
+
+            upsertStmt.setInt(1, 20);
+            upsertStmt.setInt(2, 100+i);
+            upsertStmt.setInt(3, 2);
+            upsertStmt.execute();
+        }
+        for (int i = 0; i < c2; i++) {
+            upsertStmt.setInt(1, 10);
+            upsertStmt.setInt(2, 200+i);
+            upsertStmt.setInt(3, 2);
+            upsertStmt.execute();
+
+            upsertStmt.setInt(1, 20);
+            upsertStmt.setInt(2, 200+i);
+            upsertStmt.setInt(3, 1);
+            upsertStmt.execute();
+        }
+        conn.commit();
+
+        String hashQuery = getQuery(table, true, swap, sort);
+        String sortQuery = getQuery(table, false, swap, sort);
+        Statement stmt = conn.createStatement();
+        ResultSet hrs = stmt.executeQuery(hashQuery);
+        ResultSet srs = stmt.executeQuery(sortQuery);
+
+        try {
+            if (c1 > 0) {
+                assertTrue(hrs.next());
+                assertTrue(srs.next());
+                assertEquals(hrs.getInt("v1"), srs.getInt("v1"));
+                assertEquals(hrs.getInt("v2"), srs.getInt("v2"));
+                assertEquals(hrs.getInt("c"), srs.getInt("c"));
+                assertEquals(hrs.getInt("v1"), 1);
+                assertEquals(hrs.getInt("v2"), 2);
+                assertEquals(hrs.getInt("c"), c1);
+            }
+            if (c2 > 0) {
+                assertTrue(hrs.next());
+                assertTrue(srs.next());
+                assertEquals(hrs.getInt("v1"), srs.getInt("v1"));
+                assertEquals(hrs.getInt("v2"), srs.getInt("v2"));
+                assertEquals(hrs.getInt("c"), srs.getInt("c"));
+                assertEquals(hrs.getInt("v1"), 2);
+                assertEquals(hrs.getInt("v2"), 1);
+                assertEquals(hrs.getInt("c"), c2);
+            }
+            assertFalse(hrs.next());
+            assertFalse(srs.next());
+        } finally {
+            hrs.close();
+            srs.close();
+        }
+    }
+
+    private void dropTable(Connection conn, String table) throws Exception {
+
+        String drop = "DROP TABLE " + table;
+        Statement stmt = conn.createStatement();
+        stmt.execute(drop);
+        stmt.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2541634d/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index c306aca..676e8ad 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -46,6 +46,7 @@ import 
org.apache.phoenix.expression.aggregator.ClientAggregators;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.iterate.AggregatingResultIterator;
 import org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.ClientHashAggregatingResultIterator;
 import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
 import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
 import org.apache.phoenix.iterate.FilterResultIterator;
@@ -62,6 +63,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator;
 import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
 import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
@@ -77,6 +79,7 @@ public class ClientAggregatePlan extends ClientProcessingPlan 
{
     private final Expression having;
     private final ServerAggregators serverAggregators;
     private final ClientAggregators clientAggregators;
+    private final boolean useHashAgg;
     
     public ClientAggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table, RowProjector projector,
             Integer limit, Integer offset, Expression where, OrderBy orderBy, 
GroupBy groupBy, Expression having, QueryPlan delegate) {
@@ -90,6 +93,10 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
         // another one.
         this.serverAggregators = 
ServerAggregators.deserialize(context.getScan()
                         .getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
context.getConnection().getQueryServices().getConfiguration(), null);
+
+        // Extract hash aggregate hint, if any.
+        HintNode hints = statement.getHint();
+        useHashAgg = hints != null && 
hints.hasHint(HintNode.Hint.HASH_AGGREGATE);
     }
 
     @Override
@@ -135,17 +142,25 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
             aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
             aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
         } else {
-            if (!groupBy.isOrderPreserving()) {
-                int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-                        QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-                List<Expression> keyExpressions = groupBy.getKeyExpressions();
+            List<Expression> keyExpressions = groupBy.getKeyExpressions();
+            if (groupBy.isOrderPreserving()) {
+                aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+            } else {
+                int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
                 List<OrderByExpression> keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
                 for (Expression keyExpression : keyExpressions) {
                     keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
                 }
-                iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+                if (useHashAgg) {
+                    // Pass in orderBy to apply any sort that has been 
optimized away
+                    aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions, orderBy);
+                } else {
+                    iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+                    aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+                }
             }
-            aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, groupBy.getKeyExpressions());
             aggResultIterator = new 
GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
         }
 
@@ -183,13 +198,18 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
         if (where != null) {
             planSteps.add("CLIENT FILTER BY " + where.toString());
         }
-        if (!groupBy.isEmpty()) {
-            if (!groupBy.isOrderPreserving()) {
+        if (groupBy.isEmpty()) {
+            planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");
+        } else if (groupBy.isOrderPreserving()) {
+            planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
+        } else if (useHashAgg) {
+            planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
+            if (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == 
OrderBy.REV_ROW_KEY_ORDER_BY) {
                 planSteps.add("CLIENT SORTED BY " + 
groupBy.getKeyExpressions().toString());
             }
-            planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
         } else {
-            planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");            
+            planSteps.add("CLIENT SORTED BY " + 
groupBy.getKeyExpressions().toString());
+            planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
         }
         if (having != null) {
             planSteps.add("CLIENT AFTER-AGGREGATION FILTER BY " + 
having.toString());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2541634d/phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
new file mode 100644
index 0000000..a07ea16
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+    implements AggregatingResultIterator {
+
+    private static final int HASH_AGG_INIT_SIZE = 64*1024;
+    private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+    private final ResultIterator resultIterator;
+    private final Aggregators aggregators;
+    private final List<Expression> groupByExpressions;
+    private final OrderBy orderBy;
+    private final MemoryChunk memoryChunk;
+    private HashMap<ImmutableBytesWritable, Aggregator[]> hash;
+    private List<ImmutableBytesWritable> keyList;
+    private Iterator<ImmutableBytesWritable> keyIterator;
+
+    public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator,
+                                               Aggregators aggregators, 
List<Expression> groupByExpressions, OrderBy orderBy) {
+
+        Objects.requireNonNull(resultIterator);
+        Objects.requireNonNull(aggregators);
+        Objects.requireNonNull(groupByExpressions);
+        this.resultIterator = resultIterator;
+        this.aggregators = aggregators;
+        this.groupByExpressions = groupByExpressions;
+        this.orderBy = orderBy;
+        memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+    }
+
+    @Override
+    public Tuple next() throws SQLException {
+        if (keyIterator == null) {
+            hash = populateHash();
+            /********
+             *
+             * Perform a post-aggregation sort only when required. There are 3 
possible scenarios:
+             * (1) The query DOES NOT have an ORDER BY -- in this case, we DO 
NOT perform a sort, and the results will be in random order.
+             * (2) The query DOES have an ORDER BY, the ORDER BY keys match 
the GROUP BY keys, and all the ORDER BY keys are ASCENDING
+             *     -- in this case, we DO perform a sort. THE ORDER BY has 
been optimized away, because the non-hash client aggregation
+             *        generates results in ascending order of the GROUP BY 
keys.
+             * (3) The query DOES have an ORDER BY, but the ORDER BY keys do 
not match the GROUP BY keys, or at least one ORDER BY key is DESCENDING
+             *     -- in this case, we DO NOT perform a sort, because the 
ORDER BY has not been optimized away and will be performed later by the
+             *        client aggregation code.
+             *
+             * Finally, we also handle optimization of reverse sort here. This 
is currently defensive, because reverse sort is not optimized away.
+             *
+             ********/
+            if (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == 
OrderBy.REV_ROW_KEY_ORDER_BY) {
+                keyList = sortKeys();
+                keyIterator = keyList.iterator();
+            } else {
+                keyIterator = hash.keySet().iterator();
+            }
+        }
+
+        if (!keyIterator.hasNext()) {
+            return null;
+        }
+
+        ImmutableBytesWritable key = keyIterator.next();
+        Aggregator[] rowAggregators = hash.get(key);
+        byte[] value = aggregators.toBytes(rowAggregators);
+        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+        return tuple;
+    }
+
+    @Override
+    public void close() throws SQLException {
+        keyIterator = null;
+        keyList = null;
+        hash = null;
+        try {
+            memoryChunk.close();
+        } finally {
+            resultIterator.close();
+        }
+    }
+
+    @Override
+    public Aggregator[] aggregate(Tuple result) {
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        aggregators.reset(rowAggregators);
+        aggregators.aggregate(rowAggregators, result);
+        return rowAggregators;
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        resultIterator.explain(planSteps);
+    }
+
+    @Override
+        public String toString() {
+        return "ClientHashAggregatingResultIterator [resultIterator="
+            + resultIterator + ", aggregators=" + aggregators + ", 
groupByExpressions="
+            + groupByExpressions + "]";
+    }
+
+    // Copied from ClientGroupedAggregatingResultIterator
+    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, 
ImmutableBytesWritable ptr) throws SQLException {
+        try {
+            ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, 
groupByExpressions);
+            ptr.set(key.get(), key.getOffset(), key.getLength());
+            return ptr;
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
+    }
+
+    // Copied from ClientGroupedAggregatingResultIterator
+    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
+        return new MultiKeyValueTuple(Collections.<Cell> 
singletonList(keyValue));
+    }
+
+    private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() 
throws SQLException {
+
+        hash = new HashMap<ImmutableBytesWritable, 
Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f);
+        final int aggSize = aggregators.getEstimatedByteSize();
+        long keySize = 0;
+
+        for (Tuple result = resultIterator.next(); result != null; result = 
resultIterator.next()) {
+            ImmutableBytesWritable key = new 
ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
+            key = getGroupingKey(result, key);
+            Aggregator[] rowAggregators = hash.get(key);
+            if (rowAggregators == null) {
+                keySize += key.getSize();
+                long hashSize = SizedUtil.sizeOfMap(hash.size() + 1, 
SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE, aggSize) + keySize;
+                if (hashSize > memoryChunk.getSize() + 
CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE) {
+                    // This will throw InsufficientMemoryException if necessary
+                    memoryChunk.resize(hashSize + 
CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+                }
+
+                rowAggregators = aggregators.newAggregators();
+                hash.put(key, rowAggregators);
+            }
+
+            aggregators.aggregate(rowAggregators, result);
+        }
+
+        return hash;
+    }
+
+    private List<ImmutableBytesWritable> sortKeys() {
+        // This will throw InsufficientMemoryException if necessary
+        memoryChunk.resize(memoryChunk.getSize() + 
SizedUtil.sizeOfArrayList(hash.size()));
+
+        keyList = new ArrayList<ImmutableBytesWritable>(hash.size());
+        keyList.addAll(hash.keySet());
+        Comparator<ImmutableBytesWritable> comp = new 
ImmutableBytesWritable.Comparator();
+        if (orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) {
+            comp = Collections.reverseOrder(comp);
+        }
+        Collections.sort(keyList, comp);
+        return keyList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2541634d/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index 39e9b05..02a44ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -108,6 +108,11 @@ public class HintNode {
          * Enforces a forward scan.
          */
         FORWARD_SCAN,
+        /**
+         * Prefer a hash aggregate over a sort plus streaming aggregate.
+         * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+         */
+        HASH_AGGREGATE,
     };
 
     private final Map<Hint,String> hints;

Reply via email to