Repository: phoenix Updated Branches: refs/heads/calcite 9677aeb94 -> f14d5a707
PHOENIX-3488 Support COUNT(DISTINCT x) in Phoenix-Calcite Integration Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f14d5a70 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f14d5a70 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f14d5a70 Branch: refs/heads/calcite Commit: f14d5a707011bc6da9e0409b9be71975ad72ea42 Parents: 9677aeb Author: Eric Lomore <eric.lom...@gmail.com> Authored: Thu Dec 1 16:49:34 2016 -0800 Committer: maryannxue <maryann....@gmail.com> Committed: Thu Dec 1 16:49:34 2016 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 46 ++++++++++++++++++++ .../phoenix/calcite/PhoenixPrepareImpl.java | 2 + .../calcite/rel/PhoenixAbstractAggregate.java | 18 +++----- .../calcite/rules/PhoenixConverterRules.java | 7 +-- 4 files changed, 58 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f14d5a70/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index 142fa1b..6154a34 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -867,6 +867,52 @@ public class CalciteIT extends BaseCalciteIT { .close(); } + @Test public void testCountDistinct() throws Exception { + start(false, 1000f).sql("select count(distinct a_string) from aTable") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $2)])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] {{3L}}) + .close(); + + start(false, 1000f).sql("select a_string, count(distinct b_string) from atable group by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{2}], EXPR$1=[COUNT(DISTINCT $3)], isOrdered=[false])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(new Object[][] { + {"a", 3L}, + {"b", 3L}, + {"c", 1L}}) + .close(); + + start(false, 1000f).sql("select organization_id, entity_id, count(distinct b_string) from atable group by entity_id ,organization_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT $3)], isOrdered=[true])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], scanOrder=[FORWARD])\n") + .resultIs(new Object[][] { + {"00D300000000XHP", "00A123122312312", 1L}, + {"00D300000000XHP", "00A223122312312", 1L}, + {"00D300000000XHP", "00A323122312312", 1L}, + {"00D300000000XHP", "00A423122312312", 1L}, + {"00D300000000XHP", "00B523122312312", 1L}, + {"00D300000000XHP", "00B623122312312", 1L}, + {"00D300000000XHP", "00B723122312312", 1L}, + {"00D300000000XHP", "00B823122312312", 1L}, + {"00D300000000XHP", "00C923122312312", 1L}}) + .close(); + + start(false, 1000f).sql("select organization_id, count(distinct entity_id), b_string from atable group by organization_id, b_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixClientProject(ORGANIZATION_ID=[$0], EXPR$1=[$2], B_STRING=[$1])\n" + + " PhoenixServerAggregate(group=[{0, 3}], EXPR$1=[COUNT(DISTINCT $1)], isOrdered=[false])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .resultIs(0, new Object[][] { + {"00D300000000XHP", 3L, "b"}, + {"00D300000000XHP", 3L, "c"}, + {"00D300000000XHP", 3L, "e"}}) + .close(); + } + @Test public void testOffset() throws Exception { start(false, 1000f).sql( "select organization_id, entity_id, a_string from aTable offset 3") http://git-wip-us.apache.org/repos/asf/phoenix/blob/f14d5a70/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java index dc4e29e..1489ae8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixPrepareImpl.java @@ -20,6 +20,7 @@ import org.apache.calcite.prepare.Prepare.Materialization; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.convert.ConverterRule; import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule; import org.apache.calcite.rel.rules.JoinCommuteRule; import org.apache.calcite.rel.rules.SortProjectTransposeRule; import org.apache.calcite.rel.rules.SortUnionTransposeRule; @@ -158,6 +159,7 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl { planner.removeRule(EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE); planner.removeRule(JoinCommuteRule.INSTANCE); + planner.removeRule(AggregateExpandDistinctAggregatesRule.INSTANCE); planner.addRule(JoinCommuteRule.SWAP_OUTER); planner.removeRule(SortUnionTransposeRule.INSTANCE); planner.addRule(SortUnionTransposeRule.MATCH_NULL_FETCH); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f14d5a70/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java index 023e9f1..5520efc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java @@ -30,6 +30,8 @@ import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.aggregator.ClientAggregators; import org.apache.phoenix.expression.aggregator.ServerAggregators; import org.apache.phoenix.expression.function.AggregateFunction; +import org.apache.phoenix.expression.function.CountAggregateFunction; +import org.apache.phoenix.expression.function.DistinctCountAggregateFunction; import org.apache.phoenix.expression.function.SingleAggregateFunction; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeyValueAccessor; @@ -82,18 +84,6 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe protected PhoenixAbstractAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); - - for (AggregateCall aggCall : aggCalls) { - if (aggCall.isDistinct()) { - throw new UnsupportedOperationException( "distinct aggregation not supported"); - } - } - switch (getGroupType()) { - case SIMPLE: - break; - default: - throw new UnsupportedOperationException("unsupported group type: " + getGroupType()); - } this.isOrderedGroupBy = isOrderedGroupSet(groupSet, child); } @@ -162,10 +152,14 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe } protected void serializeAggregators(PhoenixRelImplementor implementor, StatementContext context, boolean isEmptyGroupBy) { + if(getGroupType() != Group.SIMPLE) throw new UnsupportedOperationException(); // TODO sort aggFuncs. same problem with group by key sorting. List<SingleAggregateFunction> aggFuncs = Lists.newArrayList(); for (AggregateCall call : aggCalls) { AggregateFunction aggFunc = CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), implementor); + if(aggFunc instanceof CountAggregateFunction && call.isDistinct()){ + aggFunc = new DistinctCountAggregateFunction(aggFunc.getChildren(), null); + } if (!(aggFunc instanceof SingleAggregateFunction)) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f14d5a70/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java index ee444fd..68db713 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java @@ -37,6 +37,7 @@ import org.apache.calcite.rel.logical.LogicalTableModify; import org.apache.calcite.rel.logical.LogicalUnion; import org.apache.calcite.rel.logical.LogicalValues; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; import org.apache.phoenix.calcite.CalciteUtils; import org.apache.phoenix.calcite.PhoenixTable; import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate; @@ -899,13 +900,13 @@ public class PhoenixConverterRules { if (input.getGroupSets().size() > 1) return false; - if (input.containsDistinctCall()) - return false; - if (input.getGroupType() != Group.SIMPLE) return false; for (AggregateCall aggCall : input.getAggCallList()) { + if(!SqlKind.COUNT.equals(aggCall.getAggregation().getKind()) && aggCall.isDistinct()) { + return false; + } if (!CalciteUtils.isAggregateFunctionSupported(aggCall.getAggregation())) { return false; }