Repository: phoenix Updated Branches: refs/heads/4.x-cdh5.11 0901175ce -> c760ac54b
PHOENIX-4751 Implement client-side hash aggregation Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c760ac54 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c760ac54 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c760ac54 Branch: refs/heads/4.x-cdh5.11 Commit: c760ac54b4da5686b885c481a2b9ed8b3c60f9e9 Parents: 0901175 Author: Gerald Sangudi <gsang...@23andme.com> Authored: Thu Jun 14 12:49:30 2018 -0700 Committer: Thomas D'Silva <tdsi...@apache.org> Committed: Thu Aug 2 20:30:23 2018 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/ClientHashAggregateIT.java | 208 ++++++++++++++++++ .../phoenix/execute/ClientAggregatePlan.java | 40 +++- .../ClientHashAggregatingResultIterator.java | 210 +++++++++++++++++++ .../java/org/apache/phoenix/parse/HintNode.java | 5 + 4 files changed, 453 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c760ac54/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java new file mode 100644 index 0000000..bdc638b --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.junit.Test; + +public class ClientHashAggregateIT extends ParallelStatsDisabledIT { + + @Test + public void testSalted() throws Exception { + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + + try { + String table = createSalted(conn); + testTable(conn, table); + } finally { + conn.close(); + } + } + + @Test + public void testUnsalted() throws Exception { + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + + try { + String table = createUnsalted(conn); + testTable(conn, table); + } finally { + conn.close(); + } + } + + private void testTable(Connection conn, String table) throws Exception { + verifyExplain(conn, table, false, false); + verifyExplain(conn, table, false, true); + verifyExplain(conn, table, true, false); + verifyExplain(conn, table, true, true); + + verifyResults(conn, table, 13, 0, false, false); + verifyResults(conn, table, 13, 0, false, true); + verifyResults(conn, table, 13, 0, true, false); + verifyResults(conn, table, 13, 0, true, true); + + verifyResults(conn, table, 13, 17, false, true); + verifyResults(conn, table, 13, 17, true, true); + + dropTable(conn, table); + } + + private String createSalted(Connection conn) throws Exception { + + String table = "SALTED_" + generateUniqueName(); + String create = "CREATE TABLE " + table + " (" + + " keyA BIGINT NOT NULL," + + " keyB BIGINT NOT NULL," + + " val SMALLINT," + + " CONSTRAINT pk PRIMARY KEY (keyA, keyB)" + + ") SALT_BUCKETS = 4"; + + conn.createStatement().execute(create); + return table; + } + + private String createUnsalted(Connection conn) throws Exception { + + String table = "UNSALTED_" + generateUniqueName(); + String create = "CREATE TABLE " + table + " (" + + " keyA BIGINT NOT NULL," + + " keyB BIGINT NOT NULL," + + " val SMALLINT," + + " CONSTRAINT pk PRIMARY KEY (keyA, keyB)" + + ")"; + + conn.createStatement().execute(create); + return table; + } + + private String getQuery(String table, boolean hash, boolean swap, boolean sort) { + + String query = "SELECT /*+ USE_SORT_MERGE_JOIN" + + (hash ? " HASH_AGGREGATE" : "") + " */" + + " t1.val v1, t2.val v2, COUNT(*) c" + + " FROM " + table + " t1 JOIN " + table + " t2" + + " ON (t1.keyB = t2.keyB)" + + " WHERE t1.keyA = 10 AND t2.keyA = 20" + + " GROUP BY " + + (swap ? "t2.val, t1.val" : "t1.val, t2.val") + + (sort ? " ORDER BY t1.val, t2.val" : "") + ; + + return query; + } + + private void verifyExplain(Connection conn, String table, boolean swap, boolean sort) throws Exception { + + String query = "EXPLAIN " + getQuery(table, true, swap, sort); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(query); + String plan = QueryUtil.getExplainPlan(rs); + rs.close(); + assertTrue(plan != null && plan.contains("CLIENT HASH AGGREGATE")); + assertTrue(plan != null && (sort == plan.contains("CLIENT SORTED BY"))); + } + + private void verifyResults(Connection conn, String table, int c1, int c2, boolean swap, boolean sort) throws Exception { + + String upsert = "UPSERT INTO " + table + "(keyA, keyB, val) VALUES(?, ?, ?)"; + PreparedStatement upsertStmt = conn.prepareStatement(upsert); + for (int i = 0; i < c1; i++) { + upsertStmt.setInt(1, 10); + upsertStmt.setInt(2, 100+i); + upsertStmt.setInt(3, 1); + upsertStmt.execute(); + + upsertStmt.setInt(1, 20); + upsertStmt.setInt(2, 100+i); + upsertStmt.setInt(3, 2); + upsertStmt.execute(); + } + for (int i = 0; i < c2; i++) { + upsertStmt.setInt(1, 10); + upsertStmt.setInt(2, 200+i); + upsertStmt.setInt(3, 2); + upsertStmt.execute(); + + upsertStmt.setInt(1, 20); + upsertStmt.setInt(2, 200+i); + upsertStmt.setInt(3, 1); + upsertStmt.execute(); + } + conn.commit(); + + String hashQuery = getQuery(table, true, swap, sort); + String sortQuery = getQuery(table, false, swap, sort); + Statement stmt = conn.createStatement(); + ResultSet hrs = stmt.executeQuery(hashQuery); + ResultSet srs = stmt.executeQuery(sortQuery); + + try { + if (c1 > 0) { + assertTrue(hrs.next()); + assertTrue(srs.next()); + assertEquals(hrs.getInt("v1"), srs.getInt("v1")); + assertEquals(hrs.getInt("v2"), srs.getInt("v2")); + assertEquals(hrs.getInt("c"), srs.getInt("c")); + assertEquals(hrs.getInt("v1"), 1); + assertEquals(hrs.getInt("v2"), 2); + assertEquals(hrs.getInt("c"), c1); + } + if (c2 > 0) { + assertTrue(hrs.next()); + assertTrue(srs.next()); + assertEquals(hrs.getInt("v1"), srs.getInt("v1")); + assertEquals(hrs.getInt("v2"), srs.getInt("v2")); + assertEquals(hrs.getInt("c"), srs.getInt("c")); + assertEquals(hrs.getInt("v1"), 2); + assertEquals(hrs.getInt("v2"), 1); + assertEquals(hrs.getInt("c"), c2); + } + assertFalse(hrs.next()); + assertFalse(srs.next()); + } finally { + hrs.close(); + srs.close(); + } + } + + private void dropTable(Connection conn, String table) throws Exception { + + String drop = "DROP TABLE " + table; + Statement stmt = conn.createStatement(); + stmt.execute(drop); + stmt.close(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c760ac54/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java index c306aca..676e8ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java @@ -46,6 +46,7 @@ import org.apache.phoenix.expression.aggregator.ClientAggregators; import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.iterate.AggregatingResultIterator; import org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator; +import org.apache.phoenix.iterate.ClientHashAggregatingResultIterator; import org.apache.phoenix.iterate.DistinctAggregatingResultIterator; import org.apache.phoenix.iterate.FilterAggregatingResultIterator; import org.apache.phoenix.iterate.FilterResultIterator; @@ -62,6 +63,7 @@ import org.apache.phoenix.iterate.SequenceResultIterator; import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator; import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.FilterableStatement; +import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; @@ -77,6 +79,7 @@ public class ClientAggregatePlan extends ClientProcessingPlan { private final Expression having; private final ServerAggregators serverAggregators; private final ClientAggregators clientAggregators; + private final boolean useHashAgg; public ClientAggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, Expression where, OrderBy orderBy, GroupBy groupBy, Expression having, QueryPlan delegate) { @@ -90,6 +93,10 @@ public class ClientAggregatePlan extends ClientProcessingPlan { // another one. this.serverAggregators = ServerAggregators.deserialize(context.getScan() .getAttribute(BaseScannerRegionObserver.AGGREGATORS), context.getConnection().getQueryServices().getConfiguration(), null); + + // Extract hash aggregate hint, if any. + HintNode hints = statement.getHint(); + useHashAgg = hints != null && hints.hasHint(HintNode.Hint.HASH_AGGREGATE); } @Override @@ -135,17 +142,25 @@ public class ClientAggregatePlan extends ClientProcessingPlan { aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators); aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } else { - if (!groupBy.isOrderPreserving()) { - int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( - QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); - List<Expression> keyExpressions = groupBy.getKeyExpressions(); + List<Expression> keyExpressions = groupBy.getKeyExpressions(); + if (groupBy.isOrderPreserving()) { + aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); + } else { + int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt + (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size()); for (Expression keyExpression : keyExpressions) { keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true)); } - iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); + + if (useHashAgg) { + // Pass in orderBy to apply any sort that has been optimized away + aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions, orderBy); + } else { + iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); + aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); + } } - aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, groupBy.getKeyExpressions()); aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } @@ -183,13 +198,18 @@ public class ClientAggregatePlan extends ClientProcessingPlan { if (where != null) { planSteps.add("CLIENT FILTER BY " + where.toString()); } - if (!groupBy.isEmpty()) { - if (!groupBy.isOrderPreserving()) { + if (groupBy.isEmpty()) { + planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW"); + } else if (groupBy.isOrderPreserving()) { + planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString()); + } else if (useHashAgg) { + planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString()); + if (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) { planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString()); } - planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString()); } else { - planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW"); + planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString()); + planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString()); } if (having != null) { planSteps.add("CLIENT AFTER-AGGREGATION FILTER BY " + having.toString()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c760ac54/phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java new file mode 100644 index 0000000..a07ea16 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.SizedUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator + implements AggregatingResultIterator { + + private static final int HASH_AGG_INIT_SIZE = 64*1024; + private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024; + private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; + private final ResultIterator resultIterator; + private final Aggregators aggregators; + private final List<Expression> groupByExpressions; + private final OrderBy orderBy; + private final MemoryChunk memoryChunk; + private HashMap<ImmutableBytesWritable, Aggregator[]> hash; + private List<ImmutableBytesWritable> keyList; + private Iterator<ImmutableBytesWritable> keyIterator; + + public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, + Aggregators aggregators, List<Expression> groupByExpressions, OrderBy orderBy) { + + Objects.requireNonNull(resultIterator); + Objects.requireNonNull(aggregators); + Objects.requireNonNull(groupByExpressions); + this.resultIterator = resultIterator; + this.aggregators = aggregators; + this.groupByExpressions = groupByExpressions; + this.orderBy = orderBy; + memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE); + } + + @Override + public Tuple next() throws SQLException { + if (keyIterator == null) { + hash = populateHash(); + /******** + * + * Perform a post-aggregation sort only when required. There are 3 possible scenarios: + * (1) The query DOES NOT have an ORDER BY -- in this case, we DO NOT perform a sort, and the results will be in random order. + * (2) The query DOES have an ORDER BY, the ORDER BY keys match the GROUP BY keys, and all the ORDER BY keys are ASCENDING + * -- in this case, we DO perform a sort. THE ORDER BY has been optimized away, because the non-hash client aggregation + * generates results in ascending order of the GROUP BY keys. + * (3) The query DOES have an ORDER BY, but the ORDER BY keys do not match the GROUP BY keys, or at least one ORDER BY key is DESCENDING + * -- in this case, we DO NOT perform a sort, because the ORDER BY has not been optimized away and will be performed later by the + * client aggregation code. + * + * Finally, we also handle optimization of reverse sort here. This is currently defensive, because reverse sort is not optimized away. + * + ********/ + if (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) { + keyList = sortKeys(); + keyIterator = keyList.iterator(); + } else { + keyIterator = hash.keySet().iterator(); + } + } + + if (!keyIterator.hasNext()) { + return null; + } + + ImmutableBytesWritable key = keyIterator.next(); + Aggregator[] rowAggregators = hash.get(key); + byte[] value = aggregators.toBytes(rowAggregators); + Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); + return tuple; + } + + @Override + public void close() throws SQLException { + keyIterator = null; + keyList = null; + hash = null; + try { + memoryChunk.close(); + } finally { + resultIterator.close(); + } + } + + @Override + public Aggregator[] aggregate(Tuple result) { + Aggregator[] rowAggregators = aggregators.getAggregators(); + aggregators.reset(rowAggregators); + aggregators.aggregate(rowAggregators, result); + return rowAggregators; + } + + @Override + public void explain(List<String> planSteps) { + resultIterator.explain(planSteps); + } + + @Override + public String toString() { + return "ClientHashAggregatingResultIterator [resultIterator=" + + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions=" + + groupByExpressions + "]"; + } + + // Copied from ClientGroupedAggregatingResultIterator + protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException { + try { + ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions); + ptr.set(key.get(), key.getOffset(), key.getLength()); + return ptr; + } catch (IOException e) { + throw new SQLException(e); + } + } + + // Copied from ClientGroupedAggregatingResultIterator + protected Tuple wrapKeyValueAsResult(KeyValue keyValue) { + return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue)); + } + + private HashMap<ImmutableBytesWritable, Aggregator[]> populateHash() throws SQLException { + + hash = new HashMap<ImmutableBytesWritable, Aggregator[]>(HASH_AGG_INIT_SIZE, 0.75f); + final int aggSize = aggregators.getEstimatedByteSize(); + long keySize = 0; + + for (Tuple result = resultIterator.next(); result != null; result = resultIterator.next()) { + ImmutableBytesWritable key = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER); + key = getGroupingKey(result, key); + Aggregator[] rowAggregators = hash.get(key); + if (rowAggregators == null) { + keySize += key.getSize(); + long hashSize = SizedUtil.sizeOfMap(hash.size() + 1, SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE, aggSize) + keySize; + if (hashSize > memoryChunk.getSize() + CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE) { + // This will throw InsufficientMemoryException if necessary + memoryChunk.resize(hashSize + CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE); + } + + rowAggregators = aggregators.newAggregators(); + hash.put(key, rowAggregators); + } + + aggregators.aggregate(rowAggregators, result); + } + + return hash; + } + + private List<ImmutableBytesWritable> sortKeys() { + // This will throw InsufficientMemoryException if necessary + memoryChunk.resize(memoryChunk.getSize() + SizedUtil.sizeOfArrayList(hash.size())); + + keyList = new ArrayList<ImmutableBytesWritable>(hash.size()); + keyList.addAll(hash.keySet()); + Comparator<ImmutableBytesWritable> comp = new ImmutableBytesWritable.Comparator(); + if (orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) { + comp = Collections.reverseOrder(comp); + } + Collections.sort(keyList, comp); + return keyList; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c760ac54/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java index 39e9b05..02a44ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java @@ -108,6 +108,11 @@ public class HintNode { * Enforces a forward scan. */ FORWARD_SCAN, + /** + * Prefer a hash aggregate over a sort plus streaming aggregate. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + */ + HASH_AGGREGATE, }; private final Map<Hint,String> hints;