Sync with Phoenix master
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e0dc286a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e0dc286a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e0dc286a Branch: refs/heads/calcite Commit: e0dc286a4655dd5a704044601a498b2aecc395f7 Parents: 760723a 776eea9 Author: maryannxue <maryann....@gmail.com> Authored: Fri Apr 8 11:40:40 2016 -0400 Committer: maryannxue <maryann....@gmail.com> Committed: Fri Apr 8 11:40:40 2016 -0400 ---------------------------------------------------------------------- .../apache/phoenix/end2end/AutoCommitIT.java | 14 +- .../apache/phoenix/end2end/CreateTableIT.java | 2 +- .../phoenix/end2end/CsvBulkLoadToolIT.java | 32 +++ .../apache/phoenix/end2end/DerivedTableIT.java | 144 +++++++++- .../apache/phoenix/end2end/GroupByCaseIT.java | 82 ++++++ .../org/apache/phoenix/end2end/HashJoinIT.java | 170 +++++++++++- .../phoenix/end2end/QueryWithOffsetIT.java | 211 +++++++++++++++ .../org/apache/phoenix/end2end/ReadOnlyIT.java | 12 +- .../apache/phoenix/end2end/SortMergeJoinIT.java | 44 +++ .../phoenix/end2end/StatsCollectorIT.java | 6 + .../org/apache/phoenix/end2end/SubqueryIT.java | 6 +- .../end2end/SubqueryUsingSortMergeJoinIT.java | 6 +- .../end2end/index/IndexExpressionIT.java | 6 +- .../phoenix/end2end/index/LocalIndexIT.java | 2 +- .../phoenix/end2end/index/ViewIndexIT.java | 72 ++++- phoenix-core/src/main/antlr3/PhoenixSQL.g | 16 +- .../calcite/rel/PhoenixAbstractAggregate.java | 7 +- .../calcite/rel/PhoenixClientAggregate.java | 2 +- .../phoenix/calcite/rel/PhoenixClientJoin.java | 2 +- .../calcite/rel/PhoenixClientProject.java | 2 +- .../phoenix/calcite/rel/PhoenixClientSort.java | 2 +- .../phoenix/calcite/rel/PhoenixFilter.java | 2 +- .../phoenix/calcite/rel/PhoenixLimit.java | 2 +- .../calcite/rel/PhoenixMergeSortUnion.java | 2 +- .../calcite/rel/PhoenixServerAggregate.java | 2 +- .../phoenix/calcite/rel/PhoenixTableScan.java | 3 +- .../phoenix/calcite/rel/PhoenixUnion.java | 2 +- .../phoenix/calcite/rel/PhoenixValues.java | 2 +- .../apache/phoenix/compile/DeleteCompiler.java | 13 +- .../apache/phoenix/compile/GroupByCompiler.java | 265 +++++++++++-------- .../apache/phoenix/compile/JoinCompiler.java | 10 +- .../phoenix/compile/ListJarsQueryPlan.java | 5 + .../apache/phoenix/compile/OffsetCompiler.java | 114 ++++++++ .../apache/phoenix/compile/OrderByCompiler.java | 3 +- .../apache/phoenix/compile/PostDDLCompiler.java | 3 +- .../apache/phoenix/compile/QueryCompiler.java | 59 +++-- .../org/apache/phoenix/compile/QueryPlan.java | 2 + .../phoenix/compile/StatementNormalizer.java | 2 +- .../phoenix/compile/SubqueryRewriter.java | 10 +- .../phoenix/compile/SubselectRewriter.java | 17 +- .../apache/phoenix/compile/TraceQueryPlan.java | 5 + .../apache/phoenix/compile/UpsertCompiler.java | 2 +- .../coprocessor/BaseScannerRegionObserver.java | 1 + .../phoenix/coprocessor/ScanRegionObserver.java | 95 ++++++- .../apache/phoenix/execute/AggregatePlan.java | 53 ++-- .../apache/phoenix/execute/BaseQueryPlan.java | 9 +- .../phoenix/execute/ClientAggregatePlan.java | 25 +- .../phoenix/execute/ClientProcessingPlan.java | 10 +- .../apache/phoenix/execute/ClientScanPlan.java | 40 ++- .../apache/phoenix/execute/CorrelatePlan.java | 2 +- .../phoenix/execute/DegenerateQueryPlan.java | 2 +- .../phoenix/execute/DelegateQueryPlan.java | 4 + .../execute/LiteralResultIterationPlan.java | 15 +- .../org/apache/phoenix/execute/ScanPlan.java | 64 +++-- .../phoenix/execute/SortMergeJoinPlan.java | 7 +- .../org/apache/phoenix/execute/UnionPlan.java | 17 +- .../apache/phoenix/execute/UnnestArrayPlan.java | 2 +- .../phoenix/iterate/BaseResultIterators.java | 19 +- .../apache/phoenix/iterate/ExplainTable.java | 16 +- .../phoenix/iterate/LimitingResultIterator.java | 2 +- .../iterate/MergeSortTopNResultIterator.java | 21 +- .../phoenix/iterate/OffsetResultIterator.java | 62 +++++ .../OrderedAggregatingResultIterator.java | 6 +- .../phoenix/iterate/OrderedResultIterator.java | 58 ++-- .../phoenix/iterate/ParallelIterators.java | 4 +- .../apache/phoenix/iterate/SerialIterators.java | 19 +- .../phoenix/iterate/TableResultIterator.java | 22 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 18 +- .../org/apache/phoenix/join/HashJoinInfo.java | 2 +- .../phoenix/mapreduce/CsvBulkLoadTool.java | 3 +- .../apache/phoenix/optimize/QueryOptimizer.java | 2 +- .../apache/phoenix/parse/DeleteStatement.java | 6 + .../phoenix/parse/FilterableStatement.java | 1 + .../org/apache/phoenix/parse/OffsetNode.java | 67 +++++ .../apache/phoenix/parse/ParseNodeFactory.java | 55 ++-- .../apache/phoenix/parse/ParseNodeRewriter.java | 2 +- .../apache/phoenix/parse/SelectStatement.java | 35 ++- .../apache/phoenix/query/QueryConstants.java | 6 + .../apache/phoenix/schema/types/PDouble.java | 4 +- .../org/apache/phoenix/schema/types/PFloat.java | 4 +- .../apache/phoenix/schema/types/PInteger.java | 2 +- .../org/apache/phoenix/schema/types/PLong.java | 4 +- .../org/apache/phoenix/util/PhoenixRuntime.java | 8 +- .../java/org/apache/phoenix/util/QueryUtil.java | 30 ++- .../java/org/apache/phoenix/util/ScanUtil.java | 4 + .../phoenix/compile/QueryCompilerTest.java | 35 ++- .../phoenix/execute/CorrelatePlanTest.java | 39 ++- .../execute/LiteralResultIteratorPlanTest.java | 192 ++++++++++++++ .../phoenix/execute/UnnestArrayPlanTest.java | 3 +- .../query/ParallelIteratorsSplitTest.java | 5 + .../apache/phoenix/spark/PhoenixSparkIT.scala | 7 +- .../phoenix/spark/ConfigurationUtil.scala | 27 +- .../org/apache/phoenix/spark/PhoenixRDD.scala | 13 +- 93 files changed, 2087 insertions(+), 427 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java index dff5d4e,0000000..6d53db5 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java @@@ -1,213 -1,0 +1,210 @@@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelFieldCollation; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.TableMapping; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.expression.aggregator.ClientAggregators; +import org.apache.phoenix.expression.aggregator.ServerAggregators; +import org.apache.phoenix.expression.function.AggregateFunction; +import org.apache.phoenix.expression.function.SingleAggregateFunction; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.RowKeyValueAccessor; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Aggregate} + * relational expression in Phoenix. + */ +abstract public class PhoenixAbstractAggregate extends Aggregate implements PhoenixRel { + + public static boolean isSingleValueCheckAggregate(Aggregate aggregate) { + List<AggregateCall> aggCalls = aggregate.getAggCallList(); + if (aggCalls.size() != 1) + return false; + + AggregateCall call = aggCalls.get(0); + return call.getAggregation().getName().equals("SINGLE_VALUE"); + } + + protected static boolean isOrderedGroupSet(ImmutableBitSet groupSet, RelNode child) { + if (groupSet.isEmpty()) { + return true; + } + + Set<Integer> ordinals = Sets.newHashSet(groupSet.asList()); + List<RelCollation> collations = child.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE); + for (int i = 0; i < collations.size(); i++) { + int count = 0; + List<RelFieldCollation> fieldCollations = collations.get(i).getFieldCollations(); + if (fieldCollations.size() < ordinals.size()) { + continue; + } + for (RelFieldCollation fieldCollation : fieldCollations.subList(0, ordinals.size())) { + if (ordinals.contains(fieldCollation.getFieldIndex())) { + count++; + } + } + if (count == ordinals.size()) { + return true; + } + } + + return false; + } + + public final boolean isOrderedGroupBy; + + protected PhoenixAbstractAggregate(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + + for (AggregateCall aggCall : aggCalls) { + if (aggCall.isDistinct()) { + throw new UnsupportedOperationException( "distinct aggregation not supported"); + } + } + switch (getGroupType()) { + case SIMPLE: + break; + default: + throw new UnsupportedOperationException("unsupported group type: " + getGroupType()); + } + + this.isOrderedGroupBy = isOrderedGroupSet(groupSet, child); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + if (isSingleValueCheckAggregate(this)) + return planner.getCostFactory().makeInfiniteCost(); + + double rowCount = mq.getRowCount(this); + double spoolSize = 0; + if (!isOrderedGroupBy) { + double bytesPerRow = mq.getAverageRowSize(this); + spoolSize = rowCount * bytesPerRow * 6 /* map size */; + } + // Aggregates with more aggregate functions cost a bit more + float multiplier = 1f + (float) aggCalls.size() * 0.125f; + for (AggregateCall aggCall : aggCalls) { + if (aggCall.getAggregation().getName().equals("SUM")) { + // Pretend that SUM costs a little bit more than $SUM0, + // to make things deterministic. + multiplier += 0.0125f; + } + } + return planner.getCostFactory().makeCost(rowCount * multiplier + spoolSize, 0, 0); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("isOrdered", isOrderedGroupBy, !groupSet.isEmpty()); + } + + protected ImmutableIntList getColumnRefList() { + List<Integer> columnRefList = Lists.newArrayList(); + for (ImmutableBitSet set : groupSets) { + columnRefList.addAll(set.asList()); + } + // TODO filterArg?? + for (AggregateCall call : aggCalls) { + columnRefList.addAll(call.getArgList()); + } + return ImmutableIntList.copyOf(columnRefList); + } + + protected GroupBy getGroupBy(Implementor implementor) { + if (groupSets.size() > 1) { + throw new UnsupportedOperationException(); + } + + List<Integer> ordinals = groupSet.asList(); + if (ordinals.isEmpty()) { + return GroupBy.EMPTY_GROUP_BY; + } + - String groupExprAttribName = isOrderedGroupBy? - BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS - : BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS; + // TODO sort group by keys. not sure if there is a way to avoid this sorting, - // otherwise we would have add an extra projection. ++ // otherwise we would have to add an extra projection. + // TODO convert key types. can be avoided? + List<Expression> keyExprs = Lists.newArrayListWithExpectedSize(ordinals.size()); + for (int i = 0; i < ordinals.size(); i++) { + Expression expr = implementor.newColumnExpression(ordinals.get(i)); + keyExprs.add(expr); + } + - return new GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(keyExprs).setKeyExpressions(keyExprs).build(); ++ return new GroupBy.GroupByBuilder().setIsOrderPreserving(isOrderedGroupBy).setExpressions(keyExprs).setKeyExpressions(keyExprs).build(); + } + + protected void serializeAggregators(Implementor implementor, StatementContext context, boolean isEmptyGroupBy) { + // TODO sort aggFuncs. same problem with group by key sorting. + List<SingleAggregateFunction> aggFuncs = Lists.newArrayList(); + for (AggregateCall call : aggCalls) { + AggregateFunction aggFunc = CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), implementor); + if (!(aggFunc instanceof SingleAggregateFunction)) { + throw new UnsupportedOperationException(); + } + aggFuncs.add((SingleAggregateFunction) aggFunc); + } + int minNullableIndex = getMinNullableIndex(aggFuncs, isEmptyGroupBy); + context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex)); + ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex); + context.getAggregationManager().setAggregators(clientAggregators); + } + + protected static QueryPlan wrapWithProject(Implementor implementor, QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> aggFuncs) { + List<Expression> exprs = Lists.newArrayList(); + for (int i = 0; i < keyExpressions.size(); i++) { + Expression keyExpr = keyExpressions.get(i); + RowKeyValueAccessor accessor = new RowKeyValueAccessor(keyExpressions, i); + Expression expr = new RowKeyColumnExpression(keyExpr, accessor, keyExpr.getDataType()); + exprs.add(expr); + } + for (SingleAggregateFunction aggFunc : aggFuncs) { + exprs.add(aggFunc); + } + + TupleProjector tupleProjector = implementor.project(exprs); + PTable projectedTable = implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns); + implementor.setTableMapping(new TableMapping(projectedTable)); + return new TupleProjectionPlan(plan, tupleProjector, null); + } + + private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) { + int minNullableIndex = aggFuncs.size(); + for (int i = 0; i < aggFuncs.size(); i++) { + SingleAggregateFunction aggFunc = aggFuncs.get(i); + if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) { + minNullableIndex = i; + break; + } + } + return minNullableIndex; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java index 680e871,0000000..164d486 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java @@@ -1,82 -1,0 +1,82 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.ClientAggregatePlan; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.TableRef; + +public class PhoenixClientAggregate extends PhoenixAbstractAggregate { + + public static PhoenixClientAggregate create(RelNode input, boolean indicator, + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, + List<AggregateCall> aggCalls) { + RelOptCluster cluster = input.getCluster(); + RelTraitSet traits = cluster.traitSetOf(PhoenixConvention.CLIENT); + return new PhoenixClientAggregate(cluster, traits, input, indicator, + groupSet, groupSets, aggCalls); + } + + private PhoenixClientAggregate(RelOptCluster cluster, RelTraitSet traits, + RelNode child, boolean indicator, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + } + + @Override + public PhoenixClientAggregate copy(RelTraitSet traits, RelNode input, + boolean indicator, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { + return create(input, indicator, groupSet, groupSets, aggregateCalls); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + if (!getInput().getConvention().satisfies(PhoenixConvention.CLIENT)) + return planner.getCostFactory().makeInfiniteCost(); + + return super.computeSelfCost(planner, mq) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList())); + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + implementor.popContext(); + + TableRef tableRef = implementor.getTableMapping().getTableRef(); + PhoenixStatement stmt = plan.getContext().getStatement(); + StatementContext context; + try { + context = new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + GroupBy groupBy = super.getGroupBy(implementor); + super.serializeAggregators(implementor, context, groupBy.isEmpty()); + - QueryPlan aggPlan = new ClientAggregatePlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan); ++ QueryPlan aggPlan = new ClientAggregatePlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, null, OrderBy.EMPTY_ORDER_BY, groupBy, null, plan); + + return PhoenixAbstractAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions())); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java index 53c0cf4,0000000..cb7dc54 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java @@@ -1,163 -1,0 +1,163 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.TableMapping; +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.JoinCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.execute.SortMergeJoinPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; + +public class PhoenixClientJoin extends PhoenixAbstractJoin { + + public static PhoenixClientJoin create(final RelNode left, final RelNode right, + RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType, + boolean isSingleValueRhs) { + final RelOptCluster cluster = left.getCluster(); + final JoinInfo joinInfo = JoinInfo.of(left, right, condition); + final RelMetadataQuery mq = RelMetadataQuery.instance(); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixConvention.CLIENT) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return PhoenixRelMdCollation.mergeJoin(mq, left, right, joinInfo.leftKeys, joinInfo.rightKeys); + } + }); + return new PhoenixClientJoin(cluster, traits, left, right, condition, variablesSet, joinType, isSingleValueRhs); + } + + private PhoenixClientJoin(RelOptCluster cluster, RelTraitSet traits, + RelNode left, RelNode right, RexNode condition, + Set<CorrelationId> variablesSet,JoinRelType joinType, boolean isSingleValueRhs) { + super(cluster, traits, left, right, condition, variablesSet, + joinType, isSingleValueRhs); + } + + @Override + public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, RelNode left, + RelNode right, JoinRelType joinRelType, boolean semiJoinDone) { + return copy(traits, condition, left, right, joinRelType, semiJoinDone, isSingleValueRhs); + } + + @Override + public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, RelNode left, + RelNode right, JoinRelType joinRelType, boolean semiJoinDone, boolean isSingleValueRhs) { + return create(left, right, condition, variablesSet, joinRelType, isSingleValueRhs); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + if (!getLeft().getConvention().satisfies(PhoenixConvention.GENERIC) + || !getRight().getConvention().satisfies(PhoenixConvention.GENERIC) + || !variablesSet.isEmpty()) + return planner.getCostFactory().makeInfiniteCost(); + + if (joinType == JoinRelType.RIGHT + || (!joinInfo.leftKeys.isEmpty() && !RelCollations.contains(mq.collations(getLeft()), joinInfo.leftKeys)) + || (!joinInfo.rightKeys.isEmpty() && !RelCollations.contains(mq.collations(getRight()), joinInfo.rightKeys))) + return planner.getCostFactory().makeInfiniteCost(); + + double rowCount = mq.getRowCount(this); + + double leftRowCount = mq.getRowCount(getLeft()); + if (Double.isInfinite(leftRowCount)) { + rowCount = leftRowCount; + } else { + rowCount += leftRowCount; + double rightRowCount = mq.getRowCount(getRight()); + if (Double.isInfinite(rightRowCount)) { + rowCount = rightRowCount; + } else { + rowCount += rightRowCount; + } + } + RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0); + + return cost.multiplyBy(SERVER_FACTOR).multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + List<Expression> leftExprs = Lists.<Expression> newArrayList(); + List<Expression> rightExprs = Lists.<Expression> newArrayList(); + + implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns && getJoinType() != JoinRelType.FULL, true, getColumnRefList(0))); + QueryPlan leftPlan = implementInput(implementor, 0, leftExprs); + PTable leftTable = implementor.getTableMapping().getPTable(); + implementor.popContext(); + + implementor.pushContext(new ImplementorContext(false, true, getColumnRefList(1))); + QueryPlan rightPlan = implementInput(implementor, 1, rightExprs); + PTable rightTable = implementor.getTableMapping().getPTable(); + implementor.popContext(); + + JoinType type = CalciteUtils.convertJoinType(getJoinType()); + PTable joinedTable; + try { + joinedTable = JoinCompiler.joinProjectedTables(leftTable, rightTable, type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + TableMapping tableMapping = new TableMapping(joinedTable); + implementor.setTableMapping(tableMapping); + TableRef tableRef = tableMapping.getTableRef(); + ColumnResolver resolver; + try { + resolver = FromCompiler.getResolver(tableRef); + } catch (SQLException e) { + throw new RuntimeException(e); + } + PhoenixStatement stmt = leftPlan.getContext().getStatement(); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + + QueryPlan plan = new SortMergeJoinPlan(context, leftPlan.getStatement(), + tableRef, type, leftPlan, rightPlan, leftExprs, rightExprs, + joinedTable, leftTable, rightTable, + leftTable.getColumns().size() - leftTable.getPKColumns().size(), + isSingleValueRhs); + + RexNode postFilter = joinInfo.getRemaining(getCluster().getRexBuilder()); + Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : CalciteUtils.toExpression(postFilter, implementor); + if (postFilter != null) { + plan = new ClientScanPlan(context, plan.getStatement(), tableRef, - RowProjector.EMPTY_PROJECTOR, null, postFilterExpr, ++ RowProjector.EMPTY_PROJECTOR, null, null, postFilterExpr, + OrderBy.EMPTY_ORDER_BY, plan); + } + + return plan; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java index 799513d,0000000..a49ceb5 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java @@@ -1,101 -1,0 +1,101 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixSequence; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.execute.TupleProjectionPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.Sequence; + +import com.google.common.base.Supplier; + +public class PhoenixClientProject extends PhoenixAbstractProject { + + public static PhoenixClientProject create(final RelNode input, + final List<? extends RexNode> projects, RelDataType rowType) { + final RelOptCluster cluster = input.getCluster(); + final RelMetadataQuery mq = RelMetadataQuery.instance(); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixConvention.CLIENT) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return RelMdCollation.project(mq, input, projects); + } + }); + return new PhoenixClientProject(cluster, traits, input, projects, rowType); + } + + private PhoenixClientProject(RelOptCluster cluster, RelTraitSet traits, + RelNode input, List<? extends RexNode> projects, RelDataType rowType) { + super(cluster, traits, input, projects, rowType); + } + + @Override + public PhoenixClientProject copy(RelTraitSet traits, RelNode input, + List<RexNode> projects, RelDataType rowType) { + return create(input, projects, rowType); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + if (!getInput().getConvention().satisfies(PhoenixConvention.GENERIC)) + return planner.getCostFactory().makeInfiniteCost(); + + return super.computeSelfCost(planner, mq) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList())); + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + implementor.popContext(); + + + PhoenixSequence sequence = CalciteUtils.findSequence(this); + final SequenceManager seqManager = sequence == null ? + null : new SequenceManager(new PhoenixStatement(sequence.pc)); + implementor.setSequenceManager(seqManager); + TupleProjector tupleProjector = project(implementor); + if (seqManager != null) { + try { + seqManager.validateSequences(Sequence.ValueOp.VALIDATE_SEQUENCE); + StatementContext context = new StatementContext( + plan.getContext().getStatement(), + plan.getContext().getResolver(), + new Scan(), seqManager); + plan = new ClientScanPlan( + context, plan.getStatement(), plan.getTableRef(), - RowProjector.EMPTY_PROJECTOR, null, null, ++ RowProjector.EMPTY_PROJECTOR, null, null, null, + OrderBy.EMPTY_ORDER_BY, plan); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + return new TupleProjectionPlan(plan, tupleProjector, null); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java index 7c73530,0000000..16c0a81 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java @@@ -1,76 -1,0 +1,76 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.TableRef; + +public class PhoenixClientSort extends PhoenixAbstractSort { + + public static PhoenixClientSort create(RelNode input, RelCollation collation) { + RelOptCluster cluster = input.getCluster(); + collation = RelCollationTraitDef.INSTANCE.canonize(collation); + RelTraitSet traits = + input.getTraitSet().replace(PhoenixConvention.CLIENT).replace(collation); + return new PhoenixClientSort(cluster, traits, input, collation); + } + + private PhoenixClientSort(RelOptCluster cluster, RelTraitSet traits, + RelNode child, RelCollation collation) { + super(cluster, traits, child, collation); + } + + @Override + public PhoenixClientSort copy(RelTraitSet traitSet, RelNode newInput, + RelCollation newCollation, RexNode offset, RexNode fetch) { + return create(newInput, newCollation); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + if (!getInput().getConvention().satisfies(PhoenixConvention.CLIENT)) + return planner.getCostFactory().makeInfiniteCost(); + + return super.computeSelfCost(planner, mq) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + if (this.offset != null) + throw new UnsupportedOperationException(); + + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + + TableRef tableRef = implementor.getTableMapping().getTableRef(); + PhoenixStatement stmt = plan.getContext().getStatement(); + StatementContext context; + try { + context = new StatementContext(stmt, FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + OrderBy orderBy = super.getOrderBy(getCollation(), implementor, null); + - return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan); ++ return new ClientScanPlan(context, plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, null, orderBy, plan); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java index 0a0ab8e,0000000..4f7f4dd mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java @@@ -1,74 -1,0 +1,74 @@@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.ClientScanPlan; +import org.apache.phoenix.expression.Expression; + +import com.google.common.base.Supplier; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Filter} + * relational expression in Phoenix. + */ +public class PhoenixFilter extends Filter implements PhoenixRel { + + public static PhoenixFilter create(final RelNode input, final RexNode condition) { + final RelOptCluster cluster = input.getCluster(); + final RelMetadataQuery mq = RelMetadataQuery.instance(); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixConvention.CLIENT) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return RelMdCollation.filter(mq, input); + } + }); + return new PhoenixFilter(cluster, traits, input, condition); + } + + private PhoenixFilter(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode condition) { + super(cluster, traits, input, condition); + } + + public PhoenixFilter copy(RelTraitSet traitSet, RelNode input, RexNode condition) { + return create(input, condition); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + if (!getInput().getConvention().satisfies(PhoenixConvention.GENERIC)) + return planner.getCostFactory().makeInfiniteCost(); + + return super.computeSelfCost(planner, mq).multiplyBy(PHOENIX_FACTOR); + } + + public QueryPlan implement(Implementor implementor) { + ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList; + ImmutableBitSet bitSet = InputFinder.analyze(condition).inputBitSet.addAll(columnRefList).build(); + columnRefList = ImmutableIntList.copyOf(bitSet.asList()); + implementor.pushContext(implementor.getCurrentContext().withColumnRefList(columnRefList)); + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + implementor.popContext(); + Expression expr = CalciteUtils.toExpression(condition, implementor); + return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), - plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, plan); ++ plan.getProjector(), null, null, expr, OrderBy.EMPTY_ORDER_BY, plan); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java index 236af84,0000000..0d77f86 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java @@@ -1,95 -1,0 +1,95 @@@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.ClientScanPlan; + +import com.google.common.base.Supplier; + +public class PhoenixLimit extends SingleRel implements PhoenixRel { + public final RexNode offset; + public final RexNode fetch; + + public static PhoenixLimit create(final RelNode input, RexNode offset, RexNode fetch) { + final RelOptCluster cluster = input.getCluster(); + final RelMetadataQuery mq = RelMetadataQuery.instance(); + final RelTraitSet traits = + cluster.traitSet().replace(PhoenixConvention.CLIENT) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return RelMdCollation.limit(mq, input); + } + }); + return new PhoenixLimit(cluster, traits, input, offset, fetch); + } + + private PhoenixLimit(RelOptCluster cluster, RelTraitSet traits, RelNode input, RexNode offset, RexNode fetch) { + super(cluster, traits, input); + this.offset = offset; + this.fetch = fetch; + } + + @Override + public PhoenixLimit copy( + RelTraitSet traitSet, + List<RelNode> newInputs) { + return create( + sole(newInputs), + offset, + fetch); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("offset", offset, offset != null) + .itemIf("fetch", fetch, fetch != null); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + if (!getInput().getConvention().satisfies(PhoenixConvention.GENERIC)) + return planner.getCostFactory().makeInfiniteCost(); + + double rowCount = mq.getRowCount(this); + return planner.getCostFactory() + .makeCost(rowCount, 0, 0) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public double estimateRowCount(RelMetadataQuery mq) { + double rows = super.estimateRowCount(mq); + return Math.min(RexLiteral.intValue(fetch), rows); + } + + @Override + public QueryPlan implement(Implementor implementor) { + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + int fetchValue = RexLiteral.intValue(fetch); + if (plan.getLimit() == null) { + return plan.limit(fetchValue); + } + + return new ClientScanPlan(plan.getContext(), plan.getStatement(), + implementor.getTableMapping().getTableRef(), RowProjector.EMPTY_PROJECTOR, - fetchValue, null, OrderBy.EMPTY_ORDER_BY, plan); ++ fetchValue, null, null, OrderBy.EMPTY_ORDER_BY, plan); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java index 9695662,0000000..f48d1ef mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java @@@ -1,78 -1,0 +1,78 @@@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.UnionPlan; +import org.apache.phoenix.parse.SelectStatement; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +public class PhoenixMergeSortUnion extends Union implements PhoenixRel { + public final RelCollation collation; + + public static PhoenixMergeSortUnion create(final List<RelNode> inputs, + final boolean all, final RelCollation collation) { + RelOptCluster cluster = inputs.get(0).getCluster(); + RelTraitSet traits = + cluster.traitSetOf(PhoenixConvention.CLIENT) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return ImmutableList.<RelCollation> of(collation); + } + }); + return new PhoenixMergeSortUnion(cluster, traits, inputs, all, collation); + } + + private PhoenixMergeSortUnion(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all, RelCollation collation) { + super(cluster, traits, inputs, all); + this.collation = collation; + } + + @Override + public PhoenixMergeSortUnion copy(RelTraitSet traits, List<RelNode> inputs, boolean all) { + return create(inputs, all, collation); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + for (RelNode input : getInputs()) { + if (!input.getConvention().satisfies(PhoenixConvention.GENERIC) + || !mq.collations(input).contains(collation)) { + return planner.getCostFactory().makeInfiniteCost(); + } + } + + double mergeSortFactor = 1.1; + return super.computeSelfCost(planner, mq) + .multiplyBy(PHOENIX_FACTOR).multiplyBy(mergeSortFactor); + } + + @Override + public QueryPlan implement(Implementor implementor) { + List<QueryPlan> subPlans = Lists.newArrayListWithExpectedSize(inputs.size()); + for (Ord<RelNode> input : Ord.zip(inputs)) { + subPlans.add(implementor.visitInput(input.i, (PhoenixRel) input.e)); + } + + final OrderBy orderBy = PhoenixAbstractSort.getOrderBy(collation, implementor, null); + return new UnionPlan(subPlans.get(0).getContext(), SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(), RowProjector.EMPTY_PROJECTOR, - null, orderBy, GroupBy.EMPTY_GROUP_BY, subPlans, null); ++ null, null, orderBy, GroupBy.EMPTY_GROUP_BY, subPlans, null); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java index 7ea2581,0000000..68b7f04 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java @@@ -1,89 -1,0 +1,89 @@@ +package org.apache.phoenix.calcite.rel; + +import java.util.Arrays; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.AggregatePlan; +import org.apache.phoenix.execute.HashJoinPlan; +import org.apache.phoenix.execute.ScanPlan; + +public class PhoenixServerAggregate extends PhoenixAbstractAggregate { + + public static PhoenixServerAggregate create(RelNode input, boolean indicator, + ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, + List<AggregateCall> aggCalls) { + RelOptCluster cluster = input.getCluster(); + RelTraitSet traits = cluster.traitSetOf(PhoenixConvention.CLIENT); + return new PhoenixServerAggregate(cluster, traits, input, indicator, + groupSet, groupSets, aggCalls); + } + + private PhoenixServerAggregate(RelOptCluster cluster, RelTraitSet traits, + RelNode child, boolean indicator, ImmutableBitSet groupSet, + List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) { + super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls); + } + + @Override + public PhoenixServerAggregate copy(RelTraitSet traits, RelNode input, boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, List<AggregateCall> aggregateCalls) { + return create(input, indicator, groupSet, groupSets, aggregateCalls); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + if (!getInput().getConvention().satisfies(PhoenixConvention.SERVER) + && !getInput().getConvention().satisfies(PhoenixConvention.SERVERJOIN)) + return planner.getCostFactory().makeInfiniteCost(); + + return super.computeSelfCost(planner, mq) + .multiplyBy(SERVER_FACTOR) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList())); + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + implementor.popContext(); + + assert (plan instanceof ScanPlan + || plan instanceof HashJoinPlan) + && plan.getLimit() == null; + + ScanPlan basePlan; + HashJoinPlan hashJoinPlan = null; + if (plan instanceof ScanPlan) { + basePlan = (ScanPlan) plan; + } else { + hashJoinPlan = (HashJoinPlan) plan; + QueryPlan delegate = hashJoinPlan.getDelegate(); + assert delegate instanceof ScanPlan; + basePlan = (ScanPlan) delegate; + } + + StatementContext context = basePlan.getContext(); + GroupBy groupBy = super.getGroupBy(implementor); + super.serializeAggregators(implementor, context, groupBy.isEmpty()); + - QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null, basePlan.getDynamicFilter()); ++ QueryPlan aggPlan = new AggregatePlan(context, basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null, basePlan.getDynamicFilter()); + if (hashJoinPlan != null) { + aggPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans()); + } + + return PhoenixAbstractAggregate.wrapWithProject(implementor, aggPlan, groupBy.getKeyExpressions(), Arrays.asList(context.getAggregationManager().getAggregators().getFunctions())); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index 54b1f27,0000000..e9324e2 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@@ -1,326 -1,0 +1,325 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.calcite.TableMapping; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.compile.WhereCompiler; +import org.apache.phoenix.compile.WhereOptimizer; +import org.apache.phoenix.execute.ScanPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.iterate.BaseResultIterators; +import org.apache.phoenix.iterate.ParallelIteratorFactory; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.types.PDataType; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; + +/** + * Scan of a Phoenix table. + */ +public class PhoenixTableScan extends TableScan implements PhoenixRel { + public enum ScanOrder { + NONE, + FORWARD, + REVERSE, + } + + public final RexNode filter; + public final ScanOrder scanOrder; + public final ScanRanges scanRanges; + public final ImmutableBitSet extendedColumnRef; + + protected final Long estimatedBytes; + protected final float rowCountFactor; + + public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table) { + return create(cluster, table, null, + getDefaultScanOrder(table.unwrap(PhoenixTable.class)), null); + } + + public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table, + RexNode filter, final ScanOrder scanOrder, ImmutableBitSet extendedColumnRef) { + final RelTraitSet traits = + cluster.traitSetOf(PhoenixConvention.SERVER) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + if (scanOrder == ScanOrder.NONE) { + return ImmutableList.of(); + } + List<RelCollation> collations = table.getCollationList(); + return scanOrder == ScanOrder.FORWARD ? collations : reverse(collations); + } + }); + return new PhoenixTableScan(cluster, traits, table, filter, scanOrder, extendedColumnRef); + } + + private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, + RelOptTable table, RexNode filter, ScanOrder scanOrder, + ImmutableBitSet extendedColumnRef) { + super(cluster, traits, table); + this.filter = filter; + this.scanOrder = scanOrder; + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + this.rowCountFactor = phoenixTable.pc.getQueryServices() + .getProps().getFloat(PhoenixRel.ROW_COUNT_FACTOR, 1f); + try { + // TODO simplify this code + TableMapping tableMapping = phoenixTable.tableMapping; + PTable pTable = tableMapping.getPTable(); + SelectStatement select = SelectStatement.SELECT_ONE; + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableMapping.getTableRef()); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + if (extendedColumnRef == null) { + extendedColumnRef = tableMapping.getDefaultExtendedColumnRef(); + } + if (filter == null) { + this.extendedColumnRef = extendedColumnRef; + } else { + this.extendedColumnRef = extendedColumnRef.union( + tableMapping.getExtendedColumnRef(ImmutableList.of(filter))); + // We use a implementor with a special implementation for field access + // here, which translates RexFieldAccess into a LiteralExpression + // with a sample value. This will achieve 3 goals at a time: + // 1) avoid getting exception when translating RexFieldAccess at this + // time when the correlate variable has not been defined yet. + // 2) get a guess of ScanRange even if the runtime value is absent. + // 3) test whether this dynamic filter is worth a recompile at runtime. + Implementor tmpImplementor = new PhoenixRelImplementorImpl(null) { + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + try { + return LiteralExpression.newConstant(type.getSampleValue(), type); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }; + tmpImplementor.setTableMapping(tableMapping); + Expression filterExpr = CalciteUtils.toExpression(filter, tmpImplementor); + filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + WhereCompiler.setScanFilter(context, select, filterExpr, true, false); + } + this.scanRanges = context.getScanRanges(); + // TODO Get estimated byte count based on column reference list. + this.estimatedBytes = BaseResultIterators.getEstimatedCount(context, pTable).getSecond(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private static ScanOrder getDefaultScanOrder(PhoenixTable table) { + //TODO why attribute value not correct in connectUsingModel?? + //return table.pc.getQueryServices().getProps().getBoolean( + // QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, + // QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER) ? + // ScanOrder.FORWARD : ScanOrder.NONE; + return ScanOrder.NONE; + } + + private static List<RelCollation> reverse(List<RelCollation> collations) { + Builder<RelCollation> builder = ImmutableList.<RelCollation>builder(); + for (RelCollation collation : collations) { + builder.add(CalciteUtils.reverseCollation(collation)); + } + return builder.build(); + } + + public boolean isReverseScanEnabled() { + return table.unwrap(PhoenixTable.class).pc + .getQueryServices().getProps().getBoolean( + QueryServices.USE_REVERSE_SCAN_ATTRIB, + QueryServicesOptions.DEFAULT_USE_REVERSE_SCAN); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return this; + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .itemIf("filter", filter, filter != null) + .itemIf("scanOrder", scanOrder, scanOrder != ScanOrder.NONE) + .itemIf("extendedColumns", extendedColumnRef, !extendedColumnRef.isEmpty()); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + double byteCount; + PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + if (estimatedBytes != null) { + byteCount = estimatedBytes; + } else { + // If stats are not available, we estimate based on selectivity. + int pkCount = scanRanges.getBoundPkColumnCount(); + if (pkCount > 0) { + byteCount = phoenixTable.byteCount * Math.pow(mq.getSelectivity(this, filter), pkCount); + } else { + byteCount = phoenixTable.byteCount; + } + } + Pair<Integer, Integer> columnRefCount = + phoenixTable.tableMapping.getExtendedColumnReferenceCount(extendedColumnRef); + double extendedColumnMultiplier = 1 + columnRefCount.getFirst() * 10 + columnRefCount.getSecond() * 0.1; + byteCount *= extendedColumnMultiplier; + byteCount *= rowCountFactor; + if (scanOrder != ScanOrder.NONE) { + // We don't want to make a big difference here. The idea is to avoid + // forcing row key order whenever the order is absolutely useless. + // E.g. in "select count(*) from t" we do not need the row key order; + // while in "select * from t order by pk0" we should force row key + // order to avoid sorting. + // Another case is "select pk0, count(*) from t", where we'd like to + // choose the row key ordered TableScan rel so that the Aggregate rel + // above it can be an stream aggregate, although at runtime this will + // eventually be an AggregatePlan, in which the "forceRowKeyOrder" + // flag takes no effect. + byteCount = addEpsilon(byteCount); + if (scanOrder == ScanOrder.REVERSE) { + byteCount = addEpsilon(byteCount); + } + } + return planner.getCostFactory() + .makeCost(byteCount + 1, byteCount + 1, 0) + .multiplyBy(0.5) /* data scan only */ + .multiplyBy(SERVER_FACTOR) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public double estimateRowCount(RelMetadataQuery mq) { + double rows = super.estimateRowCount(mq); + if (filter != null && !filter.isAlwaysTrue()) { + rows = rows * mq.getSelectivity(this, filter); + } + + return rows * rowCountFactor; + } + + @Override + public List<RelCollation> getCollationList() { + return getTraitSet().getTraits(RelCollationTraitDef.INSTANCE); + } + + @Override + public QueryPlan implement(Implementor implementor) { + final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); + TableMapping tableMapping = phoenixTable.tableMapping; + implementor.setTableMapping(tableMapping); + try { + PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); + ColumnResolver resolver = FromCompiler.getResolver(tableMapping.getTableRef()); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); + SelectStatement select = SelectStatement.SELECT_ONE; + ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList; + Expression filterExpr = LiteralExpression.newConstant(Boolean.TRUE); + Expression dynamicFilter = null; + if (filter != null) { + ImmutableBitSet bitSet = InputFinder.analyze(filter).inputBitSet.addAll(columnRefList).build(); + columnRefList = ImmutableIntList.copyOf(bitSet.asList()); + filterExpr = CalciteUtils.toExpression(filter, implementor); + } + Expression rem = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); + WhereCompiler.setScanFilter(context, select, rem, true, false); + // TODO This is not absolutely strict. We may have a filter like: + // pk = '0' and pk = $cor0 where $cor0 happens to get a sample value + // as '0', thus making the below test return false and adding an + // unnecessary dynamic filter. This would only be a performance bug though. + if (filter != null && !context.getScanRanges().equals(this.scanRanges)) { + dynamicFilter = filterExpr; + } + tableMapping.setupScanForExtendedTable(context.getScan(), extendedColumnRef, context.getConnection()); + projectColumnFamilies(context.getScan(), tableMapping.getMappedColumns(), columnRefList); + if (implementor.getCurrentContext().forceProject) { + boolean retainPKColumns = implementor.getCurrentContext().retainPKColumns; + TupleProjector tupleProjector = tableMapping.createTupleProjector(retainPKColumns); + TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); + PTable projectedTable = tableMapping.createProjectedTable(retainPKColumns); + implementor.setTableMapping(new TableMapping(projectedTable)); + } - Integer limit = null; + OrderBy orderBy = scanOrder == ScanOrder.NONE ? + OrderBy.EMPTY_ORDER_BY + : (scanOrder == ScanOrder.FORWARD ? + OrderBy.FWD_ROW_KEY_ORDER_BY + : OrderBy.REV_ROW_KEY_ORDER_BY); + ParallelIteratorFactory iteratorFactory = null; - return new ScanPlan(context, select, tableMapping.getTableRef(), RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, dynamicFilter); ++ return new ScanPlan(context, select, tableMapping.getTableRef(), RowProjector.EMPTY_PROJECTOR, null, null, orderBy, iteratorFactory, true, dynamicFilter); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void projectColumnFamilies(Scan scan, List<PColumn> mappedColumns, ImmutableIntList columnRefList) { + scan.getFamilyMap().clear(); + for (Integer index : columnRefList) { + PColumn column = mappedColumns.get(index); + PName familyName = column.getFamilyName(); + if (familyName != null) { + scan.addFamily(familyName.getBytes()); + } + } + } + + private double addEpsilon(double d) { + assert d >= 0d; + final double d0 = d; + if (d < 10) { + // For small d, adding 1 would change the value significantly. + d *= 1.001d; + if (d != d0) { + return d; + } + } + // For medium d, add 1. Keeps integral values integral. + ++d; + if (d != d0) { + return d; + } + // For large d, adding 1 might not change the value. Add .1%. + // If d is NaN, this still will probably not change the value. That's OK. + d *= 1.001d; + return d; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java index 70bc71c,0000000..57393c8 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java @@@ -1,64 -1,0 +1,64 @@@ +package org.apache.phoenix.calcite.rel; + +import java.util.List; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.GroupByCompiler.GroupBy; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.execute.UnionPlan; +import org.apache.phoenix.parse.SelectStatement; +import com.google.common.collect.Lists; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Union} + * relational expression in Phoenix. + */ +public class PhoenixUnion extends Union implements PhoenixRel { + + public static PhoenixUnion create(List<RelNode> inputs, boolean all) { + RelOptCluster cluster = inputs.get(0).getCluster(); + RelTraitSet traits = cluster.traitSetOf(PhoenixConvention.CLIENT); + return new PhoenixUnion(cluster, traits, inputs, all); + } + + private PhoenixUnion(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) { + super(cluster, traits, inputs, all); + } + + @Override + public PhoenixUnion copy(RelTraitSet traits, List<RelNode> inputs, boolean all) { + return create(inputs, all); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + for (RelNode input : getInputs()) { + if (!input.getConvention().satisfies(PhoenixConvention.GENERIC)) { + return planner.getCostFactory().makeInfiniteCost(); + } + } + + return super.computeSelfCost(planner, mq) + .multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + List<QueryPlan> subPlans = Lists.newArrayListWithExpectedSize(inputs.size()); + for (Ord<RelNode> input : Ord.zip(inputs)) { + subPlans.add(implementor.visitInput(input.i, (PhoenixRel) input.e)); + } + + return new UnionPlan(subPlans.get(0).getContext(), SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(), RowProjector.EMPTY_PROJECTOR, - null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, subPlans, null); ++ null, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, subPlans, null); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java index 83947ae,0000000..c0fd272 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java @@@ -1,136 -1,0 +1,136 @@@ +package org.apache.phoenix.calcite.rel; + +import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Iterator; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelCollationTraitDef; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.metadata.RelMdCollation; +import org.apache.calcite.rel.metadata.RelMdDistribution; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexLiteral; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.TableMapping; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.execute.LiteralResultIterationPlan; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; + +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Implementation of {@link org.apache.calcite.rel.core.Values} + * relational expression in Phoenix. + */ +public class PhoenixValues extends Values implements PhoenixRel { + + private static final PhoenixConnection phoenixConnection; + static { + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + final Connection connection = + DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS); + phoenixConnection = + connection.unwrap(PhoenixConnection.class); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static PhoenixValues create(RelOptCluster cluster, final RelDataType rowType, final ImmutableList<ImmutableList<RexLiteral>> tuples) { + final RelMetadataQuery mq = RelMetadataQuery.instance(); + final RelTraitSet traits = + cluster.traitSetOf(PhoenixConvention.CLIENT) + .replaceIfs(RelCollationTraitDef.INSTANCE, + new Supplier<List<RelCollation>>() { + public List<RelCollation> get() { + return RelMdCollation.values(mq, rowType, tuples); + } + }) + .replaceIf(RelDistributionTraitDef.INSTANCE, + new Supplier<RelDistribution>() { + public RelDistribution get() { + return RelMdDistribution.values(rowType, tuples); + } + }); + return new PhoenixValues(cluster, rowType, tuples, traits); + } + + private PhoenixValues(RelOptCluster cluster, RelDataType rowType, ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) { + super(cluster, rowType, tuples, traits); + } + + @Override + public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) { + assert inputs.isEmpty(); + return create(getCluster(), rowType, tuples); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + return super.computeSelfCost(planner, mq).multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + List<Tuple> literalResult = Lists.newArrayList(); + Iterator<ImmutableList<RexLiteral>> iter = getTuples().iterator(); + Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY); + while (iter.hasNext()) { + ImmutableList<RexLiteral> row = iter.next(); + List<Expression> exprs = Lists.newArrayListWithExpectedSize(row.size()); + for (RexLiteral rexLiteral : row) { + exprs.add(CalciteUtils.toExpression(rexLiteral, implementor)); + } + TupleProjector projector = implementor.project(exprs); + literalResult.add(projector.projectResults(baseTuple)); + } + PTable projectedTable = implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns); + TableMapping tableMapping = new TableMapping(projectedTable); + implementor.setTableMapping(tableMapping); + + try { + PhoenixStatement stmt = new PhoenixStatement(phoenixConnection); + ColumnResolver resolver = FromCompiler.getResolver(tableMapping.getTableRef()); + StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); - return new LiteralResultIterationPlan(literalResult, context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null); ++ return new LiteralResultIterationPlan(literalResult, context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, null); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 794247b,b125ecc..e5ea255 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@@ -77,23 -76,20 +77,24 @@@ public class AggregatePlan extends Base private final Expression having; private List<KeyRange> splits; private List<List<Scan>> scans; + + public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) { - return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving(), plan.dynamicFilter); ++ return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getProjector(), null, null, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving(), plan.dynamicFilter); + } - public AggregatePlan( - StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, - Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, - Expression having) { - this(context, statement, table, projector, limit, orderBy, parallelIteratorFactory, groupBy, having, null); + public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, + RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, + ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having) { + this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, groupBy, having, + null); } - - public AggregatePlan( - StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, - Integer limit, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, - Expression having, Expression dynamicFilter) { - super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, parallelIteratorFactory, dynamicFilter); + - private AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, ++ public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, + RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, + ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having, + Expression dynamicFilter) { + super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, offset, + orderBy, groupBy, parallelIteratorFactory, dynamicFilter); this.having = having; this.aggregators = context.getAggregationManager().getAggregators(); } @@@ -245,13 -251,4 +254,13 @@@ public boolean useRoundRobinIterator() throws SQLException { return false; } + + @Override + public QueryPlan limit(Integer limit) { + if (limit == this.limit || (limit != null && limit.equals(this.limit))) + return this; + + return new AggregatePlan(this.context, this.statement, this.tableRef, this.projection, - limit, this.orderBy, this.parallelIteratorFactory, this.groupBy, this.having, this.dynamicFilter); ++ limit, this.offset, this.orderBy, this.parallelIteratorFactory, this.groupBy, this.having, this.dynamicFilter); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ----------------------------------------------------------------------