Repository: phoenix Updated Branches: refs/heads/calcite 8097c8b9d -> b01bdd172
Passed another two tests: aggregate+project, aggregate+join Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b01bdd17 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b01bdd17 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b01bdd17 Branch: refs/heads/calcite Commit: b01bdd1721ca6b2425c9011bda09c339128459e4 Parents: 8097c8b Author: maryannxue <wei....@intel.com> Authored: Thu Apr 2 15:13:47 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Thu Apr 2 15:13:47 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteTest.java | 26 ++++++ .../phoenix/calcite/PhoenixAggregate.java | 84 ++++++++------------ .../org/apache/phoenix/calcite/PhoenixJoin.java | 2 +- .../apache/phoenix/calcite/PhoenixProject.java | 33 ++------ .../org/apache/phoenix/calcite/PhoenixRel.java | 5 ++ .../calcite/PhoenixRelImplementorImpl.java | 36 +++++++++ .../phoenix/calcite/PhoenixTableScan.java | 6 +- 7 files changed, 113 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java index 70d44f6..333315c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java @@ -256,6 +256,7 @@ public class CalciteTest extends BaseClientManagedTimeIT { {"00A323122312312", "a", "00D300000000XHP"}, {"00A423122312312", "a", "00D300000000XHP"}}) .close(); + start().sql("select t1.entity_id, t2.a_string, t3.organization_id from aTable t1 join aTable t2 on t1.entity_id = t2.entity_id and t1.organization_id = t2.organization_id join atable t3 on t1.entity_id = t3.entity_id and t1.organization_id = t3.organization_id") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixProject(ENTITY_ID=[$19], A_STRING=[$2], ORGANIZATION_ID=[$36])\n" + @@ -287,6 +288,31 @@ public class CalciteTest extends BaseClientManagedTimeIT { {"b", 4L}, {"c", 1L}}) .close(); + + start().sql("select count(entity_id), a_string from atable group by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixProject(EXPR$0=[$1], A_STRING=[$0])\n" + + " PhoenixAggregate(group=[{0}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], project=[[$2]])\n") + .resultIs(new Object[][] { + {4L, "a"}, + {4L, "b"}, + {1L, "c"}}) + .close(); + + start().sql("select s.name, count(\"item_id\") from " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s join " + JOIN_ITEM_TABLE_FULL_NAME + " i on s.\"supplier_id\" = i.\"supplier_id\" group by s.name") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixAggregate(group=[{0}], EXPR$1=[COUNT()])\n" + + " PhoenixProject(NAME=[$1])\n" + + " PhoenixJoin(condition=[=($0, $2)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, SUPPLIERTABLE]], project=[[$0, $1]])\n" + + " PhoenixTableScan(table=[[phoenix, ITEMTABLE]], project=[[$5]])\n") + .resultIs(new Object[][] { + {"S1", 2L}, + {"S2", 2L}, + {"S5", 1L}, + {"S6", 1L}}) + .close(); } @Test public void testSubquery() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java index 7a38f25..0c620c8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixAggregate.java @@ -7,6 +7,7 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.InvalidRelException; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Aggregate; @@ -27,7 +28,8 @@ import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.ClientAggregatePlan; import org.apache.phoenix.execute.HashJoinPlan; import org.apache.phoenix.execute.ScanPlan; -import org.apache.phoenix.expression.CoerceExpression; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.aggregator.ClientAggregators; @@ -36,12 +38,9 @@ import org.apache.phoenix.expression.function.AggregateFunction; import org.apache.phoenix.expression.function.SingleAggregateFunction; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.schema.types.PDecimal; -import org.apache.phoenix.schema.types.PVarchar; - import com.google.common.collect.Lists; /** @@ -49,6 +48,8 @@ import com.google.common.collect.Lists; * relational expression in Phoenix. */ public class PhoenixAggregate extends Aggregate implements PhoenixRel { + private static double SERVER_AGGREGATE_FACTOR = 0.2; + public PhoenixAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) throws InvalidRelException { super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); assert getConvention() == PhoenixRel.CONVENTION; @@ -68,7 +69,11 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel { @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR); + RelOptCost cost = super.computeSelfCost(planner); + if (isServerAggregate()) { + cost = cost.multiplyBy(SERVER_AGGREGATE_FACTOR); + } + return cost.multiplyBy(PHOENIX_FACTOR); } @Override @@ -117,25 +122,13 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel { String groupExprAttribName = BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS; // TODO sort group by keys. not sure if there is a way to avoid this sorting, // otherwise we would have add an extra projection. - List<Expression> exprs = Lists.newArrayListWithExpectedSize(ordinals.size()); - List<Expression> keyExprs = exprs; + // TODO convert key types. can be avoided? + List<Expression> keyExprs = Lists.newArrayListWithExpectedSize(ordinals.size()); for (int i = 0; i < ordinals.size(); i++) { Expression expr = implementor.newColumnExpression(ordinals.get(i)); - exprs.add(expr); - PDataType keyType = getKeyType(expr); - if (keyType == expr.getDataType()) { - continue; - } - if (keyExprs == exprs) { - keyExprs = Lists.newArrayList(exprs); - } - try { - keyExprs.set(i, CoerceExpression.create(expr, keyType)); - } catch (SQLException e) { - throw new RuntimeException(e); - } + keyExprs.add(expr); } - GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(exprs).setKeyExpressions(keyExprs).build(); + GroupBy groupBy = new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(keyExprs).setKeyExpressions(keyExprs).build(); // TODO sort aggFuncs. same problem with group by key sorting. List<SingleAggregateFunction> aggFuncs = Lists.newArrayList(); @@ -152,46 +145,39 @@ public class PhoenixAggregate extends Aggregate implements PhoenixRel { context.getAggregationManager().setAggregators(clientAggregators); SelectStatement select = SelectStatement.SELECT_STAR; - RowProjector rowProjector = createRowProjector(keyExprs, aggFuncs); + QueryPlan aggPlan; if (basePlan == null) { - return new ClientAggregatePlan(context, select, tableRef, rowProjector, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan); + aggPlan = new ClientAggregatePlan(context, select, tableRef, implementor.createRowProjector(), null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan); + } else { + aggPlan = new AggregatePlan(context, select, basePlan.getTableRef(), implementor.createRowProjector(), null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null); + if (plan instanceof HashJoinPlan) { + HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; + aggPlan = HashJoinPlan.create(select, aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); + } } - QueryPlan aggPlan = new AggregatePlan(context, select, basePlan.getTableRef(), rowProjector, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null); - if (plan instanceof ScanPlan) - return aggPlan; - - HashJoinPlan hashJoinPlan = (HashJoinPlan) plan; - return HashJoinPlan.create(select, aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); - } - - private static RowProjector createRowProjector(List<Expression> keyExprs, List<SingleAggregateFunction> aggFuncs) { - List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + List<Expression> exprs = Lists.newArrayList(); for (int i = 0; i < keyExprs.size(); i++) { Expression keyExpr = keyExprs.get(i); RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExprs, i); Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType()); - columnProjectors.add(new ExpressionProjector(expr.toString(), "", expr, false)); + exprs.add(expr); } for (SingleAggregateFunction aggFunc : aggFuncs) { - columnProjectors.add(new ExpressionProjector(aggFunc.toString(), "", aggFunc, false)); + exprs.add(aggFunc); } - return new RowProjector(columnProjectors, 0, false); + TupleProjector tupleProjector = implementor.project(exprs); + PTable projectedTable = implementor.createProjectedTable(); + implementor.setTableRef(new TableRef(projectedTable)); + return new TupleProjectionPlan(aggPlan, tupleProjector, null, implementor.createRowProjector()); } - private static PDataType getKeyType(Expression expression) { - PDataType type = expression.getDataType(); - if (!expression.isNullable() || !type.isFixedWidth()) { - return type; - } - if (type.isCastableTo(PDecimal.INSTANCE)) { - return PDecimal.INSTANCE; - } - if (type.isCastableTo(PVarchar.INSTANCE)) { - return PVarchar.INSTANCE; + public boolean isServerAggregate() { + RelNode rel = getInput(); + if (rel instanceof RelSubset) { + rel = ((RelSubset) rel).getBest(); } - // This might happen if someone tries to group by an array - throw new IllegalStateException("Multiple occurrences of type " + type + " may not occur in a GROUP BY clause"); + return (rel instanceof PhoenixTableScan) || (rel instanceof PhoenixJoin && ((PhoenixJoin) rel).isHashJoinDoable()); } private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java index e5f9cda..a1384a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixJoin.java @@ -123,7 +123,7 @@ public class PhoenixJoin extends Join implements PhoenixRel { return HashJoinPlan.create(SelectStatement.SELECT_STAR, leftPlan, hashJoinInfo, new HashJoinPlan.HashSubPlan[] {new HashJoinPlan.HashSubPlan(0, rightPlan, rightExprs, false, null, null)}); } - private boolean isHashJoinDoable() { + public boolean isHashJoinDoable() { // TODO check memory limit RelNode rel = getLeft(); if (rel instanceof RelSubset) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java index b1ca8fa..6b82f42 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java @@ -54,36 +54,13 @@ public class PhoenixProject extends Project implements PhoenixRel { assert getConvention() == getInput().getConvention(); QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); - TupleProjector tupleProjector = project(implementor, getProjects()); + List<Expression> exprs = Lists.newArrayList(); + for (RexNode project : getProjects()) { + exprs.add(CalciteUtils.toExpression(project, implementor)); + } + TupleProjector tupleProjector = implementor.project(exprs); PTable projectedTable = implementor.createProjectedTable(); implementor.setTableRef(new TableRef(projectedTable)); return new TupleProjectionPlan(plan, tupleProjector, null, implementor.createRowProjector()); } - - protected static TupleProjector project(Implementor implementor, List<RexNode> projects) { - KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); - Expression[] exprs = new Expression[projects.size()]; - List<PColumn> columns = Lists.<PColumn>newArrayList(); - for (int i = 0; i < projects.size(); i++) { - String name = projects.get(i).toString(); - Expression expr = CalciteUtils.toExpression(projects.get(i), implementor); - builder.addField(expr); - exprs[i] = expr; - columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), - expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), - i, expr.getSortOrder(), null, null, false, name)); - } - try { - PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, - PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, - null, null, columns, null, null, Collections.<PTable>emptyList(), - false, Collections.<PName>emptyList(), null, null, false, false, false, null, - null, null); - implementor.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false)); - } catch (SQLException e) { - throw new RuntimeException(e); - } - - return new TupleProjector(builder.build(), exprs); - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java index 27a7b0e..d89cdab 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java @@ -1,10 +1,14 @@ package org.apache.phoenix.calcite; +import java.util.List; + import org.apache.calcite.plan.Convention; import org.apache.calcite.rel.RelNode; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.Expression; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; @@ -51,5 +55,6 @@ public interface PhoenixRel extends RelNode { ImplementorContext getCurrentContext(); PTable createProjectedTable(); RowProjector createRowProjector(); + TupleProjector project(List<Expression> exprs); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java index ef92f34..67e1fd0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java @@ -1,20 +1,30 @@ package org.apache.phoenix.calcite; import java.sql.SQLException; +import java.util.Collections; import java.util.List; import java.util.Stack; +import org.apache.hadoop.hbase.HConstants; import org.apache.phoenix.calcite.PhoenixRel.ImplementorContext; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ExpressionProjector; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import com.google.common.collect.Lists; @@ -89,5 +99,31 @@ class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { // TODO get estimate row size return new RowProjector(columnProjectors, 0, false); } + + @Override + public TupleProjector project(List<Expression> exprs) { + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + List<PColumn> columns = Lists.<PColumn>newArrayList(); + for (int i = 0; i < exprs.size(); i++) { + Expression expr = exprs.get(i); + String name = expr.toString(); + builder.addField(expr); + columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), + expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), + i, expr.getSortOrder(), null, null, false, name)); + } + try { + PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, + PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, + null, null, columns, null, null, Collections.<PTable>emptyList(), + false, Collections.<PName>emptyList(), null, null, false, false, false, null, + null, null); + this.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b01bdd17/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java index f681c88..e21d28f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTableScan.java @@ -121,7 +121,11 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { if (projects == null) { tupleProjector = createTupleProjector(implementor, phoenixTable.getTable()); } else { - tupleProjector = PhoenixProject.project(implementor, this.projects); + List<Expression> exprs = Lists.newArrayList(); + for (RexNode project : this.projects) { + exprs.add(CalciteUtils.toExpression(project, implementor)); + } + tupleProjector = implementor.project(exprs); } TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); PTable projectedTable = implementor.createProjectedTable();