Sync with Phoenix master

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e0dc286a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e0dc286a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e0dc286a

Branch: refs/heads/calcite
Commit: e0dc286a4655dd5a704044601a498b2aecc395f7
Parents: 760723a 776eea9
Author: maryannxue <maryann....@gmail.com>
Authored: Fri Apr 8 11:40:40 2016 -0400
Committer: maryannxue <maryann....@gmail.com>
Committed: Fri Apr 8 11:40:40 2016 -0400

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AutoCommitIT.java    |  14 +-
 .../apache/phoenix/end2end/CreateTableIT.java   |   2 +-
 .../phoenix/end2end/CsvBulkLoadToolIT.java      |  32 +++
 .../apache/phoenix/end2end/DerivedTableIT.java  | 144 +++++++++-
 .../apache/phoenix/end2end/GroupByCaseIT.java   |  82 ++++++
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 170 +++++++++++-
 .../phoenix/end2end/QueryWithOffsetIT.java      | 211 +++++++++++++++
 .../org/apache/phoenix/end2end/ReadOnlyIT.java  |  12 +-
 .../apache/phoenix/end2end/SortMergeJoinIT.java |  44 +++
 .../phoenix/end2end/StatsCollectorIT.java       |   6 +
 .../org/apache/phoenix/end2end/SubqueryIT.java  |   6 +-
 .../end2end/SubqueryUsingSortMergeJoinIT.java   |   6 +-
 .../end2end/index/IndexExpressionIT.java        |   6 +-
 .../phoenix/end2end/index/LocalIndexIT.java     |   2 +-
 .../phoenix/end2end/index/ViewIndexIT.java      |  72 ++++-
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  16 +-
 .../calcite/rel/PhoenixAbstractAggregate.java   |   7 +-
 .../calcite/rel/PhoenixClientAggregate.java     |   2 +-
 .../phoenix/calcite/rel/PhoenixClientJoin.java  |   2 +-
 .../calcite/rel/PhoenixClientProject.java       |   2 +-
 .../phoenix/calcite/rel/PhoenixClientSort.java  |   2 +-
 .../phoenix/calcite/rel/PhoenixFilter.java      |   2 +-
 .../phoenix/calcite/rel/PhoenixLimit.java       |   2 +-
 .../calcite/rel/PhoenixMergeSortUnion.java      |   2 +-
 .../calcite/rel/PhoenixServerAggregate.java     |   2 +-
 .../phoenix/calcite/rel/PhoenixTableScan.java   |   3 +-
 .../phoenix/calcite/rel/PhoenixUnion.java       |   2 +-
 .../phoenix/calcite/rel/PhoenixValues.java      |   2 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  13 +-
 .../apache/phoenix/compile/GroupByCompiler.java | 265 +++++++++++--------
 .../apache/phoenix/compile/JoinCompiler.java    |  10 +-
 .../phoenix/compile/ListJarsQueryPlan.java      |   5 +
 .../apache/phoenix/compile/OffsetCompiler.java  | 114 ++++++++
 .../apache/phoenix/compile/OrderByCompiler.java |   3 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |   3 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  59 +++--
 .../org/apache/phoenix/compile/QueryPlan.java   |   2 +
 .../phoenix/compile/StatementNormalizer.java    |   2 +-
 .../phoenix/compile/SubqueryRewriter.java       |  10 +-
 .../phoenix/compile/SubselectRewriter.java      |  17 +-
 .../apache/phoenix/compile/TraceQueryPlan.java  |   5 +
 .../apache/phoenix/compile/UpsertCompiler.java  |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   1 +
 .../phoenix/coprocessor/ScanRegionObserver.java |  95 ++++++-
 .../apache/phoenix/execute/AggregatePlan.java   |  53 ++--
 .../apache/phoenix/execute/BaseQueryPlan.java   |   9 +-
 .../phoenix/execute/ClientAggregatePlan.java    |  25 +-
 .../phoenix/execute/ClientProcessingPlan.java   |  10 +-
 .../apache/phoenix/execute/ClientScanPlan.java  |  40 ++-
 .../apache/phoenix/execute/CorrelatePlan.java   |   2 +-
 .../phoenix/execute/DegenerateQueryPlan.java    |   2 +-
 .../phoenix/execute/DelegateQueryPlan.java      |   4 +
 .../execute/LiteralResultIterationPlan.java     |  15 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  64 +++--
 .../phoenix/execute/SortMergeJoinPlan.java      |   7 +-
 .../org/apache/phoenix/execute/UnionPlan.java   |  17 +-
 .../apache/phoenix/execute/UnnestArrayPlan.java |   2 +-
 .../phoenix/iterate/BaseResultIterators.java    |  19 +-
 .../apache/phoenix/iterate/ExplainTable.java    |  16 +-
 .../phoenix/iterate/LimitingResultIterator.java |   2 +-
 .../iterate/MergeSortTopNResultIterator.java    |  21 +-
 .../phoenix/iterate/OffsetResultIterator.java   |  62 +++++
 .../OrderedAggregatingResultIterator.java       |   6 +-
 .../phoenix/iterate/OrderedResultIterator.java  |  58 ++--
 .../phoenix/iterate/ParallelIterators.java      |   4 +-
 .../apache/phoenix/iterate/SerialIterators.java |  19 +-
 .../phoenix/iterate/TableResultIterator.java    |  22 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  18 +-
 .../org/apache/phoenix/join/HashJoinInfo.java   |   2 +-
 .../phoenix/mapreduce/CsvBulkLoadTool.java      |   3 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../apache/phoenix/parse/DeleteStatement.java   |   6 +
 .../phoenix/parse/FilterableStatement.java      |   1 +
 .../org/apache/phoenix/parse/OffsetNode.java    |  67 +++++
 .../apache/phoenix/parse/ParseNodeFactory.java  |  55 ++--
 .../apache/phoenix/parse/ParseNodeRewriter.java |   2 +-
 .../apache/phoenix/parse/SelectStatement.java   |  35 ++-
 .../apache/phoenix/query/QueryConstants.java    |   6 +
 .../apache/phoenix/schema/types/PDouble.java    |   4 +-
 .../org/apache/phoenix/schema/types/PFloat.java |   4 +-
 .../apache/phoenix/schema/types/PInteger.java   |   2 +-
 .../org/apache/phoenix/schema/types/PLong.java  |   4 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |   8 +-
 .../java/org/apache/phoenix/util/QueryUtil.java |  30 ++-
 .../java/org/apache/phoenix/util/ScanUtil.java  |   4 +
 .../phoenix/compile/QueryCompilerTest.java      |  35 ++-
 .../phoenix/execute/CorrelatePlanTest.java      |  39 ++-
 .../execute/LiteralResultIteratorPlanTest.java  | 192 ++++++++++++++
 .../phoenix/execute/UnnestArrayPlanTest.java    |   3 +-
 .../query/ParallelIteratorsSplitTest.java       |   5 +
 .../apache/phoenix/spark/PhoenixSparkIT.scala   |   7 +-
 .../phoenix/spark/ConfigurationUtil.scala       |  27 +-
 .../org/apache/phoenix/spark/PhoenixRDD.scala   |  13 +-
 93 files changed, 2087 insertions(+), 427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
index dff5d4e,0000000..6d53db5
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
@@@ -1,213 -1,0 +1,210 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.util.List;
 +import java.util.Set;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelFieldCollation;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.RelWriter;
 +import org.apache.calcite.rel.core.Aggregate;
 +import org.apache.calcite.rel.core.AggregateCall;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.util.ImmutableBitSet;
 +import org.apache.calcite.util.ImmutableIntList;
 +import org.apache.phoenix.calcite.CalciteUtils;
 +import org.apache.phoenix.calcite.TableMapping;
 +import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 +import org.apache.phoenix.execute.TupleProjectionPlan;
 +import org.apache.phoenix.execute.TupleProjector;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.expression.RowKeyColumnExpression;
 +import org.apache.phoenix.expression.aggregator.ClientAggregators;
 +import org.apache.phoenix.expression.aggregator.ServerAggregators;
 +import org.apache.phoenix.expression.function.AggregateFunction;
 +import org.apache.phoenix.expression.function.SingleAggregateFunction;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.RowKeyValueAccessor;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Sets;
 +
 +/**
 + * Implementation of {@link org.apache.calcite.rel.core.Aggregate}
 + * relational expression in Phoenix.
 + */
 +abstract public class PhoenixAbstractAggregate extends Aggregate implements 
PhoenixRel {
 +    
 +    public static boolean isSingleValueCheckAggregate(Aggregate aggregate) {
 +        List<AggregateCall> aggCalls = aggregate.getAggCallList();
 +        if (aggCalls.size() != 1)
 +            return false;
 +        
 +        AggregateCall call = aggCalls.get(0);
 +        return call.getAggregation().getName().equals("SINGLE_VALUE");
 +    }
 +    
 +    protected static boolean isOrderedGroupSet(ImmutableBitSet groupSet, 
RelNode child) {
 +        if (groupSet.isEmpty()) {
 +            return true;
 +        }
 +        
 +        Set<Integer> ordinals = Sets.newHashSet(groupSet.asList());
 +        List<RelCollation> collations = 
child.getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);
 +        for (int i = 0; i < collations.size(); i++) {
 +            int count = 0;
 +            List<RelFieldCollation> fieldCollations = 
collations.get(i).getFieldCollations();
 +            if (fieldCollations.size() < ordinals.size()) {
 +                continue;
 +            }
 +            for (RelFieldCollation fieldCollation : 
fieldCollations.subList(0, ordinals.size())) {
 +                if (ordinals.contains(fieldCollation.getFieldIndex())) {
 +                    count++;
 +                }
 +            }
 +            if (count == ordinals.size()) {
 +                return true;
 +            }
 +        }
 +        
 +        return false;
 +    }
 +    
 +    public final boolean isOrderedGroupBy;
 +    
 +    protected PhoenixAbstractAggregate(RelOptCluster cluster, RelTraitSet 
traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, 
List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
 +        super(cluster, traits, child, indicator, groupSet, groupSets, 
aggCalls);
 +
 +        for (AggregateCall aggCall : aggCalls) {
 +            if (aggCall.isDistinct()) {
 +                throw new UnsupportedOperationException( "distinct 
aggregation not supported");
 +            }
 +        }
 +        switch (getGroupType()) {
 +            case SIMPLE:
 +                break;
 +            default:
 +                throw new UnsupportedOperationException("unsupported group 
type: " + getGroupType());
 +        }
 +        
 +        this.isOrderedGroupBy = isOrderedGroupSet(groupSet, child);
 +    }
 +    
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        if (isSingleValueCheckAggregate(this))
 +            return planner.getCostFactory().makeInfiniteCost();
 +        
 +        double rowCount = mq.getRowCount(this);
 +        double spoolSize = 0;
 +        if (!isOrderedGroupBy) {
 +            double bytesPerRow = mq.getAverageRowSize(this);
 +            spoolSize = rowCount * bytesPerRow * 6 /* map size */;
 +        }
 +        // Aggregates with more aggregate functions cost a bit more
 +        float multiplier = 1f + (float) aggCalls.size() * 0.125f;
 +        for (AggregateCall aggCall : aggCalls) {
 +          if (aggCall.getAggregation().getName().equals("SUM")) {
 +            // Pretend that SUM costs a little bit more than $SUM0,
 +            // to make things deterministic.
 +            multiplier += 0.0125f;
 +          }
 +        }
 +        return planner.getCostFactory().makeCost(rowCount * multiplier + 
spoolSize, 0, 0);
 +    }
 +
 +    @Override
 +    public RelWriter explainTerms(RelWriter pw) {
 +        return super.explainTerms(pw)
 +            .itemIf("isOrdered", isOrderedGroupBy, !groupSet.isEmpty());
 +    }
 +    
 +    protected ImmutableIntList getColumnRefList() {
 +        List<Integer> columnRefList = Lists.newArrayList();
 +        for (ImmutableBitSet set : groupSets) {
 +            columnRefList.addAll(set.asList());
 +        }
 +        // TODO filterArg??
 +        for (AggregateCall call : aggCalls) {
 +            columnRefList.addAll(call.getArgList());
 +        }
 +        return ImmutableIntList.copyOf(columnRefList);
 +    }
 +    
 +    protected GroupBy getGroupBy(Implementor implementor) {
 +        if (groupSets.size() > 1) {
 +            throw new UnsupportedOperationException();
 +        }
 +        
 +        List<Integer> ordinals = groupSet.asList();
 +        if (ordinals.isEmpty()) {
 +            return GroupBy.EMPTY_GROUP_BY;
 +        }
 +        
-         String groupExprAttribName = isOrderedGroupBy?
-                 BaseScannerRegionObserver.KEY_ORDERED_GROUP_BY_EXPRESSIONS
-               : BaseScannerRegionObserver.UNORDERED_GROUP_BY_EXPRESSIONS;
 +        // TODO sort group by keys. not sure if there is a way to avoid this 
sorting,
-         //      otherwise we would have add an extra projection.
++        //      otherwise we would have to add an extra projection.
 +        // TODO convert key types. can be avoided?
 +        List<Expression> keyExprs = 
Lists.newArrayListWithExpectedSize(ordinals.size());
 +        for (int i = 0; i < ordinals.size(); i++) {
 +            Expression expr = 
implementor.newColumnExpression(ordinals.get(i));
 +            keyExprs.add(expr);
 +        }
 +        
-         return new 
GroupBy.GroupByBuilder().setScanAttribName(groupExprAttribName).setExpressions(keyExprs).setKeyExpressions(keyExprs).build();
        
++        return new 
GroupBy.GroupByBuilder().setIsOrderPreserving(isOrderedGroupBy).setExpressions(keyExprs).setKeyExpressions(keyExprs).build();
        
 +    }
 +    
 +    protected void serializeAggregators(Implementor implementor, 
StatementContext context, boolean isEmptyGroupBy) {
 +        // TODO sort aggFuncs. same problem with group by key sorting.
 +        List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
 +        for (AggregateCall call : aggCalls) {
 +            AggregateFunction aggFunc = 
CalciteUtils.toAggregateFunction(call.getAggregation(), call.getArgList(), 
implementor);
 +            if (!(aggFunc instanceof SingleAggregateFunction)) {
 +                throw new UnsupportedOperationException();
 +            }
 +            aggFuncs.add((SingleAggregateFunction) aggFunc);
 +        }
 +        int minNullableIndex = getMinNullableIndex(aggFuncs, isEmptyGroupBy);
 +        context.getScan().setAttribute(BaseScannerRegionObserver.AGGREGATORS, 
ServerAggregators.serialize(aggFuncs, minNullableIndex));
 +        ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, 
minNullableIndex);
 +        context.getAggregationManager().setAggregators(clientAggregators);
 +    }
 +    
 +    protected static QueryPlan wrapWithProject(Implementor implementor, 
QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> 
aggFuncs) {
 +        List<Expression> exprs = Lists.newArrayList();
 +        for (int i = 0; i < keyExpressions.size(); i++) {
 +            Expression keyExpr = keyExpressions.get(i);
 +            RowKeyValueAccessor accessor = new 
RowKeyValueAccessor(keyExpressions, i);
 +            Expression expr = new RowKeyColumnExpression(keyExpr, accessor, 
keyExpr.getDataType());
 +            exprs.add(expr);
 +        }
 +        for (SingleAggregateFunction aggFunc : aggFuncs) {
 +            exprs.add(aggFunc);
 +        }
 +        
 +        TupleProjector tupleProjector = implementor.project(exprs);
 +        PTable projectedTable = 
implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns);
 +        implementor.setTableMapping(new TableMapping(projectedTable));
 +        return new TupleProjectionPlan(plan, tupleProjector, null);
 +    }
 +    
 +    private static int getMinNullableIndex(List<SingleAggregateFunction> 
aggFuncs, boolean isUngroupedAggregation) {
 +        int minNullableIndex = aggFuncs.size();
 +        for (int i = 0; i < aggFuncs.size(); i++) {
 +            SingleAggregateFunction aggFunc = aggFuncs.get(i);
 +            if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() 
: aggFunc.getAggregatorExpression().isNullable()) {
 +                minNullableIndex = i;
 +                break;
 +            }
 +        }
 +        return minNullableIndex;
 +    }
 +    
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
index 680e871,0000000..164d486
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
@@@ -1,82 -1,0 +1,82 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.sql.SQLException;
 +import java.util.Arrays;
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.AggregateCall;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.util.ImmutableBitSet;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.execute.ClientAggregatePlan;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.schema.TableRef;
 +
 +public class PhoenixClientAggregate extends PhoenixAbstractAggregate {
 +    
 +    public static PhoenixClientAggregate create(RelNode input, boolean 
indicator, 
 +            ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, 
 +            List<AggregateCall> aggCalls) {
 +        RelOptCluster cluster = input.getCluster();
 +        RelTraitSet traits = cluster.traitSetOf(PhoenixConvention.CLIENT);
 +        return new PhoenixClientAggregate(cluster, traits, input, indicator, 
 +                groupSet, groupSets, aggCalls);
 +    }
 +
 +    private PhoenixClientAggregate(RelOptCluster cluster, RelTraitSet traits,
 +            RelNode child, boolean indicator, ImmutableBitSet groupSet,
 +            List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
 +        super(cluster, traits, child, indicator, groupSet, groupSets, 
aggCalls);
 +    }
 +
 +    @Override
 +    public PhoenixClientAggregate copy(RelTraitSet traits, RelNode input,
 +            boolean indicator, ImmutableBitSet groupSet,
 +            List<ImmutableBitSet> groupSets, List<AggregateCall> 
aggregateCalls) {
 +        return create(input, indicator, groupSet, groupSets, aggregateCalls);
 +    }
 +    
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        if (!getInput().getConvention().satisfies(PhoenixConvention.CLIENT))
 +            return planner.getCostFactory().makeInfiniteCost();
 +        
 +        return super.computeSelfCost(planner, mq)
 +                .multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        
implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
 +        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
 +        implementor.popContext();
 +        
 +        TableRef tableRef = implementor.getTableMapping().getTableRef();
 +        PhoenixStatement stmt = plan.getContext().getStatement();
 +        StatementContext context;
 +        try {
 +            context = new StatementContext(stmt, 
FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt));
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }        
 +        GroupBy groupBy = super.getGroupBy(implementor);       
 +        super.serializeAggregators(implementor, context, groupBy.isEmpty());
 +        
-         QueryPlan aggPlan = new ClientAggregatePlan(context, 
plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, 
OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
++        QueryPlan aggPlan = new ClientAggregatePlan(context, 
plan.getStatement(), tableRef, RowProjector.EMPTY_PROJECTOR, null, null, null, 
OrderBy.EMPTY_ORDER_BY, groupBy, null, plan);
 +        
 +        return PhoenixAbstractAggregate.wrapWithProject(implementor, aggPlan, 
groupBy.getKeyExpressions(), 
Arrays.asList(context.getAggregationManager().getAggregators().getFunctions()));
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
index 53c0cf4,0000000..cb7dc54
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@@ -1,163 -1,0 +1,163 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.sql.SQLException;
 +import java.util.List;
 +import java.util.Set;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelCollations;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.CorrelationId;
 +import org.apache.calcite.rel.core.JoinInfo;
 +import org.apache.calcite.rel.core.JoinRelType;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.calcite.CalciteUtils;
 +import org.apache.phoenix.calcite.TableMapping;
 +import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
 +import org.apache.phoenix.compile.ColumnResolver;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.JoinCompiler;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.execute.ClientScanPlan;
 +import org.apache.phoenix.execute.SortMergeJoinPlan;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.parse.JoinTableNode.JoinType;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.TableRef;
 +
 +import com.google.common.base.Supplier;
 +import com.google.common.collect.Lists;
 +
 +public class PhoenixClientJoin extends PhoenixAbstractJoin {
 +    
 +    public static PhoenixClientJoin create(final RelNode left, final RelNode 
right, 
 +            RexNode condition, Set<CorrelationId> variablesSet, JoinRelType 
joinType,
 +            boolean isSingleValueRhs) {
 +        final RelOptCluster cluster = left.getCluster();
 +        final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
 +        final RelMetadataQuery mq = RelMetadataQuery.instance();
 +        final RelTraitSet traits =
 +                cluster.traitSet().replace(PhoenixConvention.CLIENT)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        return PhoenixRelMdCollation.mergeJoin(mq, left, 
right, joinInfo.leftKeys, joinInfo.rightKeys);
 +                    }
 +                });
 +        return new PhoenixClientJoin(cluster, traits, left, right, condition, 
variablesSet, joinType, isSingleValueRhs);
 +    }
 +
 +    private PhoenixClientJoin(RelOptCluster cluster, RelTraitSet traits,
 +            RelNode left, RelNode right, RexNode condition,
 +             Set<CorrelationId> variablesSet,JoinRelType joinType, boolean 
isSingleValueRhs) {
 +        super(cluster, traits, left, right, condition, variablesSet,
 +                joinType, isSingleValueRhs);
 +    }
 +
 +    @Override
 +    public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, 
RelNode left,
 +            RelNode right, JoinRelType joinRelType, boolean semiJoinDone) {
 +        return copy(traits, condition, left, right, joinRelType, 
semiJoinDone, isSingleValueRhs);
 +    }
 +
 +    @Override
 +    public PhoenixClientJoin copy(RelTraitSet traits, RexNode condition, 
RelNode left,
 +            RelNode right, JoinRelType joinRelType, boolean semiJoinDone, 
boolean isSingleValueRhs) {
 +        return create(left, right, condition, variablesSet, joinRelType, 
isSingleValueRhs);
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        if (!getLeft().getConvention().satisfies(PhoenixConvention.GENERIC) 
 +                || 
!getRight().getConvention().satisfies(PhoenixConvention.GENERIC)
 +                || !variablesSet.isEmpty())
 +            return planner.getCostFactory().makeInfiniteCost();            
 +        
 +        if (joinType == JoinRelType.RIGHT
 +                 || (!joinInfo.leftKeys.isEmpty() && 
!RelCollations.contains(mq.collations(getLeft()), joinInfo.leftKeys))
 +                 || (!joinInfo.rightKeys.isEmpty() && 
!RelCollations.contains(mq.collations(getRight()), joinInfo.rightKeys)))
 +            return planner.getCostFactory().makeInfiniteCost();
 +        
 +        double rowCount = mq.getRowCount(this);        
 +
 +        double leftRowCount = mq.getRowCount(getLeft());
 +        if (Double.isInfinite(leftRowCount)) {
 +            rowCount = leftRowCount;
 +        } else {
 +            rowCount += leftRowCount;
 +            double rightRowCount = mq.getRowCount(getRight());
 +            if (Double.isInfinite(rightRowCount)) {
 +                rowCount = rightRowCount;
 +            } else {
 +                rowCount += rightRowCount;
 +            }
 +        }            
 +        RelOptCost cost = planner.getCostFactory().makeCost(rowCount, 0, 0);
 +
 +        return cost.multiplyBy(SERVER_FACTOR).multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        List<Expression> leftExprs = Lists.<Expression> newArrayList();
 +        List<Expression> rightExprs = Lists.<Expression> newArrayList();
 +
 +        implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().retainPKColumns && 
getJoinType() != JoinRelType.FULL, true, getColumnRefList(0)));
 +        QueryPlan leftPlan = implementInput(implementor, 0, leftExprs);
 +        PTable leftTable = implementor.getTableMapping().getPTable();
 +        implementor.popContext();
 +
 +        implementor.pushContext(new ImplementorContext(false, true, 
getColumnRefList(1)));
 +        QueryPlan rightPlan = implementInput(implementor, 1, rightExprs);
 +        PTable rightTable = implementor.getTableMapping().getPTable();
 +        implementor.popContext();
 +        
 +        JoinType type = CalciteUtils.convertJoinType(getJoinType());
 +        PTable joinedTable;
 +        try {
 +            joinedTable = JoinCompiler.joinProjectedTables(leftTable, 
rightTable, type);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +        TableMapping tableMapping = new TableMapping(joinedTable);
 +        implementor.setTableMapping(tableMapping);
 +        TableRef tableRef = tableMapping.getTableRef();
 +        ColumnResolver resolver;
 +        try {
 +            resolver = FromCompiler.getResolver(tableRef);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +        PhoenixStatement stmt = leftPlan.getContext().getStatement();
 +        StatementContext context = new StatementContext(stmt, resolver, new 
Scan(), new SequenceManager(stmt));
 +
 +        QueryPlan plan = new SortMergeJoinPlan(context, 
leftPlan.getStatement(), 
 +                tableRef, type, leftPlan, rightPlan, leftExprs, rightExprs, 
 +                joinedTable, leftTable, rightTable, 
 +                leftTable.getColumns().size() - 
leftTable.getPKColumns().size(), 
 +                isSingleValueRhs);
 +        
 +        RexNode postFilter = 
joinInfo.getRemaining(getCluster().getRexBuilder());
 +        Expression postFilterExpr = postFilter.isAlwaysTrue() ? null : 
CalciteUtils.toExpression(postFilter, implementor);
 +        if (postFilter != null) {
 +            plan = new ClientScanPlan(context, plan.getStatement(), tableRef, 
-                     RowProjector.EMPTY_PROJECTOR, null, postFilterExpr, 
++                    RowProjector.EMPTY_PROJECTOR, null, null, postFilterExpr, 
 +                    OrderBy.EMPTY_ORDER_BY, plan);
 +        }
 +        
 +        return plan;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
index 799513d,0000000..a49ceb5
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
@@@ -1,101 -1,0 +1,101 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.sql.SQLException;
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.metadata.RelMdCollation;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.calcite.CalciteUtils;
 +import org.apache.phoenix.calcite.PhoenixSequence;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.execute.ClientScanPlan;
 +import org.apache.phoenix.execute.TupleProjectionPlan;
 +import org.apache.phoenix.execute.TupleProjector;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.schema.Sequence;
 +
 +import com.google.common.base.Supplier;
 +
 +public class PhoenixClientProject extends PhoenixAbstractProject {
 +    
 +    public static PhoenixClientProject create(final RelNode input, 
 +            final List<? extends RexNode> projects, RelDataType rowType) {
 +        final RelOptCluster cluster = input.getCluster();
 +        final RelMetadataQuery mq = RelMetadataQuery.instance();
 +        final RelTraitSet traits =
 +                cluster.traitSet().replace(PhoenixConvention.CLIENT)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        return RelMdCollation.project(mq, input, projects);
 +                    }
 +                });
 +        return new PhoenixClientProject(cluster, traits, input, projects, 
rowType);
 +    }
 +
 +    private PhoenixClientProject(RelOptCluster cluster, RelTraitSet traits,
 +            RelNode input, List<? extends RexNode> projects, RelDataType 
rowType) {
 +        super(cluster, traits, input, projects, rowType);
 +    }
 +
 +    @Override
 +    public PhoenixClientProject copy(RelTraitSet traits, RelNode input,
 +            List<RexNode> projects, RelDataType rowType) {
 +        return create(input, projects, rowType);
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        if (!getInput().getConvention().satisfies(PhoenixConvention.GENERIC))
 +            return planner.getCostFactory().makeInfiniteCost();
 +        
 +        return super.computeSelfCost(planner, mq)
 +                .multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        
implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
 +        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
 +        implementor.popContext();
 +        
 +        
 +        PhoenixSequence sequence = CalciteUtils.findSequence(this);
 +        final SequenceManager seqManager = sequence == null ?
 +                null : new SequenceManager(new PhoenixStatement(sequence.pc));
 +        implementor.setSequenceManager(seqManager);
 +        TupleProjector tupleProjector = project(implementor);
 +        if (seqManager != null) {
 +            try {
 +                
seqManager.validateSequences(Sequence.ValueOp.VALIDATE_SEQUENCE);
 +                StatementContext context = new StatementContext(
 +                        plan.getContext().getStatement(),
 +                        plan.getContext().getResolver(),
 +                        new Scan(), seqManager);
 +                plan = new ClientScanPlan(
 +                        context, plan.getStatement(), plan.getTableRef(), 
-                         RowProjector.EMPTY_PROJECTOR, null, null, 
++                        RowProjector.EMPTY_PROJECTOR, null, null, null,
 +                        OrderBy.EMPTY_ORDER_BY, plan);
 +            } catch (SQLException e) {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +        
 +        return new TupleProjectionPlan(plan, tupleProjector, null);
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
index 7c73530,0000000..16c0a81
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
@@@ -1,76 -1,0 +1,76 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.sql.SQLException;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.execute.ClientScanPlan;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.schema.TableRef;
 +
 +public class PhoenixClientSort extends PhoenixAbstractSort {
 +    
 +    public static PhoenixClientSort create(RelNode input, RelCollation 
collation) {
 +        RelOptCluster cluster = input.getCluster();
 +        collation = RelCollationTraitDef.INSTANCE.canonize(collation);
 +        RelTraitSet traits =
 +            
input.getTraitSet().replace(PhoenixConvention.CLIENT).replace(collation);
 +        return new PhoenixClientSort(cluster, traits, input, collation);
 +    }
 +
 +    private PhoenixClientSort(RelOptCluster cluster, RelTraitSet traits,
 +            RelNode child, RelCollation collation) {
 +        super(cluster, traits, child, collation);
 +    }
 +
 +    @Override
 +    public PhoenixClientSort copy(RelTraitSet traitSet, RelNode newInput,
 +            RelCollation newCollation, RexNode offset, RexNode fetch) {
 +        return create(newInput, newCollation);
 +    }
 +    
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        if (!getInput().getConvention().satisfies(PhoenixConvention.CLIENT))
 +            return planner.getCostFactory().makeInfiniteCost();
 +        
 +        return super.computeSelfCost(planner, mq)
 +                .multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        if (this.offset != null)
 +            throw new UnsupportedOperationException();
 +            
 +        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
 +        
 +        TableRef tableRef = implementor.getTableMapping().getTableRef();
 +        PhoenixStatement stmt = plan.getContext().getStatement();
 +        StatementContext context;
 +        try {
 +            context = new StatementContext(stmt, 
FromCompiler.getResolver(tableRef), new Scan(), new SequenceManager(stmt));
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +        
 +        OrderBy orderBy = super.getOrderBy(getCollation(), implementor, null);
 +        
-         return new ClientScanPlan(context, plan.getStatement(), tableRef, 
RowProjector.EMPTY_PROJECTOR, null, null, orderBy, plan);
++        return new ClientScanPlan(context, plan.getStatement(), tableRef, 
RowProjector.EMPTY_PROJECTOR, null, null, null, orderBy, plan);
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
index 0a0ab8e,0000000..4f7f4dd
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
@@@ -1,74 -1,0 +1,74 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.plan.RelOptUtil.InputFinder;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.Filter;
 +import org.apache.calcite.rel.metadata.RelMdCollation;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.calcite.util.ImmutableBitSet;
 +import org.apache.calcite.util.ImmutableIntList;
 +import org.apache.phoenix.calcite.CalciteUtils;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.execute.ClientScanPlan;
 +import org.apache.phoenix.expression.Expression;
 +
 +import com.google.common.base.Supplier;
 +
 +/**
 + * Implementation of {@link org.apache.calcite.rel.core.Filter}
 + * relational expression in Phoenix.
 + */
 +public class PhoenixFilter extends Filter implements PhoenixRel {
 +    
 +    public static PhoenixFilter create(final RelNode input, final RexNode 
condition) {
 +        final RelOptCluster cluster = input.getCluster();
 +        final RelMetadataQuery mq = RelMetadataQuery.instance();
 +        final RelTraitSet traits =
 +                cluster.traitSet().replace(PhoenixConvention.CLIENT)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        return RelMdCollation.filter(mq, input);
 +                    }
 +                });
 +        return new PhoenixFilter(cluster, traits, input, condition);
 +    }
 +    
 +    private PhoenixFilter(RelOptCluster cluster, RelTraitSet traits, RelNode 
input, RexNode condition) {
 +        super(cluster, traits, input, condition);
 +    }
 +
 +    public PhoenixFilter copy(RelTraitSet traitSet, RelNode input, RexNode 
condition) {
 +        return create(input, condition);
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        if (!getInput().getConvention().satisfies(PhoenixConvention.GENERIC))
 +            return planner.getCostFactory().makeInfiniteCost();
 +        
 +        return super.computeSelfCost(planner, mq).multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    public QueryPlan implement(Implementor implementor) {
 +        ImmutableIntList columnRefList = 
implementor.getCurrentContext().columnRefList;
 +        ImmutableBitSet bitSet = 
InputFinder.analyze(condition).inputBitSet.addAll(columnRefList).build();
 +        columnRefList = ImmutableIntList.copyOf(bitSet.asList());
 +        
implementor.pushContext(implementor.getCurrentContext().withColumnRefList(columnRefList));
 +        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
 +        implementor.popContext();
 +        Expression expr = CalciteUtils.toExpression(condition, implementor);
 +        return new ClientScanPlan(plan.getContext(), plan.getStatement(), 
plan.getTableRef(),
-                 plan.getProjector(), null, expr, OrderBy.EMPTY_ORDER_BY, 
plan);
++                plan.getProjector(), null, null, expr, 
OrderBy.EMPTY_ORDER_BY, plan);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
index 236af84,0000000..0d77f86
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
@@@ -1,95 -1,0 +1,95 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.RelWriter;
 +import org.apache.calcite.rel.SingleRel;
 +import org.apache.calcite.rel.metadata.RelMdCollation;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rex.RexLiteral;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.execute.ClientScanPlan;
 +
 +import com.google.common.base.Supplier;
 +
 +public class PhoenixLimit extends SingleRel implements PhoenixRel {
 +    public final RexNode offset;
 +    public final RexNode fetch;
 +    
 +    public static PhoenixLimit create(final RelNode input, RexNode offset, 
RexNode fetch) {
 +        final RelOptCluster cluster = input.getCluster();
 +        final RelMetadataQuery mq = RelMetadataQuery.instance();
 +        final RelTraitSet traits =
 +                cluster.traitSet().replace(PhoenixConvention.CLIENT)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        return RelMdCollation.limit(mq, input);
 +                    }
 +                });
 +        return new PhoenixLimit(cluster, traits, input, offset, fetch);
 +    }
 +
 +    private PhoenixLimit(RelOptCluster cluster, RelTraitSet traits, RelNode 
input, RexNode offset, RexNode fetch) {
 +        super(cluster, traits, input);
 +        this.offset = offset;
 +        this.fetch = fetch;
 +    }
 +    
 +    @Override
 +    public PhoenixLimit copy(
 +            RelTraitSet traitSet,
 +            List<RelNode> newInputs) {
 +        return create(
 +                sole(newInputs),
 +                offset,
 +                fetch);
 +    }
 +
 +    @Override 
 +    public RelWriter explainTerms(RelWriter pw) {
 +        return super.explainTerms(pw)
 +                .itemIf("offset", offset, offset != null)
 +                .itemIf("fetch", fetch, fetch != null);
 +    }
 +
 +    @Override 
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        if (!getInput().getConvention().satisfies(PhoenixConvention.GENERIC))
 +            return planner.getCostFactory().makeInfiniteCost();
 +        
 +        double rowCount = mq.getRowCount(this);
 +        return planner.getCostFactory()
 +                .makeCost(rowCount, 0, 0)
 +                .multiplyBy(PHOENIX_FACTOR);
 +    }
 +    
 +    @Override 
 +    public double estimateRowCount(RelMetadataQuery mq) {
 +        double rows = super.estimateRowCount(mq);        
 +        return Math.min(RexLiteral.intValue(fetch), rows);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
 +        int fetchValue = RexLiteral.intValue(fetch);
 +        if (plan.getLimit() == null) {
 +            return plan.limit(fetchValue);
 +        }
 +        
 +        return new ClientScanPlan(plan.getContext(), plan.getStatement(), 
 +                implementor.getTableMapping().getTableRef(), 
RowProjector.EMPTY_PROJECTOR, 
-                 fetchValue, null, OrderBy.EMPTY_ORDER_BY, plan);
++                fetchValue, null, null, OrderBy.EMPTY_ORDER_BY, plan);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
index 9695662,0000000..f48d1ef
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
@@@ -1,78 -1,0 +1,78 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.util.List;
 +
 +import org.apache.calcite.linq4j.Ord;
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.Union;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.execute.UnionPlan;
 +import org.apache.phoenix.parse.SelectStatement;
 +
 +import com.google.common.base.Supplier;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.Lists;
 +
 +public class PhoenixMergeSortUnion extends Union implements PhoenixRel {
 +      public final RelCollation collation;
 +    
 +    public static PhoenixMergeSortUnion create(final List<RelNode> inputs,
 +            final boolean all, final RelCollation collation) {
 +        RelOptCluster cluster = inputs.get(0).getCluster();
 +        RelTraitSet traits = 
 +                      cluster.traitSetOf(PhoenixConvention.CLIENT)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        return ImmutableList.<RelCollation> of(collation);
 +                    }
 +                });
 +        return new PhoenixMergeSortUnion(cluster, traits, inputs, all, 
collation);
 +    }
 +    
 +    private PhoenixMergeSortUnion(RelOptCluster cluster, RelTraitSet traits, 
List<RelNode> inputs, boolean all, RelCollation collation) {
 +        super(cluster, traits, inputs, all);
 +        this.collation = collation;
 +    }
 +
 +    @Override
 +    public PhoenixMergeSortUnion copy(RelTraitSet traits, List<RelNode> 
inputs, boolean all) {
 +        return create(inputs, all, collation);
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        for (RelNode input : getInputs()) {
 +            if (!input.getConvention().satisfies(PhoenixConvention.GENERIC)
 +                    || !mq.collations(input).contains(collation)) {
 +                return planner.getCostFactory().makeInfiniteCost();
 +            }
 +        }
 +        
 +        double mergeSortFactor = 1.1;
 +        return super.computeSelfCost(planner, mq)
 +                .multiplyBy(PHOENIX_FACTOR).multiplyBy(mergeSortFactor);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        List<QueryPlan> subPlans = 
Lists.newArrayListWithExpectedSize(inputs.size());
 +        for (Ord<RelNode> input : Ord.zip(inputs)) {
 +            subPlans.add(implementor.visitInput(input.i, (PhoenixRel) 
input.e));
 +        }
 +        
 +        final OrderBy orderBy = PhoenixAbstractSort.getOrderBy(collation, 
implementor, null);
 +        return new UnionPlan(subPlans.get(0).getContext(), 
SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(), 
RowProjector.EMPTY_PROJECTOR,
-                 null, orderBy, GroupBy.EMPTY_GROUP_BY, subPlans, null);
++                null, null, orderBy, GroupBy.EMPTY_GROUP_BY, subPlans, null);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
index 7ea2581,0000000..68b7f04
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
@@@ -1,89 -1,0 +1,89 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.util.Arrays;
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.AggregateCall;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.util.ImmutableBitSet;
 +import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.execute.AggregatePlan;
 +import org.apache.phoenix.execute.HashJoinPlan;
 +import org.apache.phoenix.execute.ScanPlan;
 +
 +public class PhoenixServerAggregate extends PhoenixAbstractAggregate {
 +    
 +    public static PhoenixServerAggregate create(RelNode input, boolean 
indicator, 
 +            ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, 
 +            List<AggregateCall> aggCalls) {
 +        RelOptCluster cluster = input.getCluster();
 +        RelTraitSet traits = cluster.traitSetOf(PhoenixConvention.CLIENT);
 +        return new PhoenixServerAggregate(cluster, traits, input, indicator, 
 +                groupSet, groupSets, aggCalls);
 +    }
 +
 +    private PhoenixServerAggregate(RelOptCluster cluster, RelTraitSet traits,
 +            RelNode child, boolean indicator, ImmutableBitSet groupSet,
 +            List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls) {
 +        super(cluster, traits, child, indicator, groupSet, groupSets, 
aggCalls);
 +    }
 +
 +    @Override
 +    public PhoenixServerAggregate copy(RelTraitSet traits, RelNode input, 
boolean indicator, ImmutableBitSet groupSet, List<ImmutableBitSet> groupSets, 
List<AggregateCall> aggregateCalls) {
 +        return create(input, indicator, groupSet, groupSets, aggregateCalls);
 +    }
 +    
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        if (!getInput().getConvention().satisfies(PhoenixConvention.SERVER)
 +                && 
!getInput().getConvention().satisfies(PhoenixConvention.SERVERJOIN))
 +            return planner.getCostFactory().makeInfiniteCost();
 +        
 +        return super.computeSelfCost(planner, mq)
 +                .multiplyBy(SERVER_FACTOR)
 +                .multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        
implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
 +        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
 +        implementor.popContext();
 +        
 +        assert (plan instanceof ScanPlan 
 +                    || plan instanceof HashJoinPlan)
 +                && plan.getLimit() == null;
 +        
 +        ScanPlan basePlan;
 +        HashJoinPlan hashJoinPlan = null;
 +        if (plan instanceof ScanPlan) {
 +            basePlan = (ScanPlan) plan;
 +        } else {
 +            hashJoinPlan = (HashJoinPlan) plan;
 +            QueryPlan delegate = hashJoinPlan.getDelegate();
 +            assert delegate instanceof ScanPlan;
 +            basePlan = (ScanPlan) delegate;
 +        }
 +        
 +        StatementContext context = basePlan.getContext();        
 +        GroupBy groupBy = super.getGroupBy(implementor);       
 +        super.serializeAggregators(implementor, context, groupBy.isEmpty());
 +        
-         QueryPlan aggPlan = new AggregatePlan(context, 
basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, 
null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null, basePlan.getDynamicFilter());
++        QueryPlan aggPlan = new AggregatePlan(context, 
basePlan.getStatement(), basePlan.getTableRef(), RowProjector.EMPTY_PROJECTOR, 
null, null, OrderBy.EMPTY_ORDER_BY, null, groupBy, null, 
basePlan.getDynamicFilter());
 +        if (hashJoinPlan != null) {        
 +            aggPlan = HashJoinPlan.create(hashJoinPlan.getStatement(), 
aggPlan, hashJoinPlan.getJoinInfo(), hashJoinPlan.getSubPlans());
 +        }
 +        
 +        return PhoenixAbstractAggregate.wrapWithProject(implementor, aggPlan, 
groupBy.getKeyExpressions(), 
Arrays.asList(context.getAggregationManager().getAggregators().getFunctions()));
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index 54b1f27,0000000..e9324e2
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@@ -1,326 -1,0 +1,325 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.sql.SQLException;
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelOptTable;
 +import org.apache.calcite.plan.RelOptUtil.InputFinder;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.RelWriter;
 +import org.apache.calcite.rel.core.TableScan;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.calcite.util.ImmutableBitSet;
 +import org.apache.calcite.util.ImmutableIntList;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.util.Pair;
 +import org.apache.phoenix.calcite.CalciteUtils;
 +import org.apache.phoenix.calcite.PhoenixTable;
 +import org.apache.phoenix.calcite.TableMapping;
 +import org.apache.phoenix.compile.ColumnResolver;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.ScanRanges;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.compile.WhereCompiler;
 +import org.apache.phoenix.compile.WhereOptimizer;
 +import org.apache.phoenix.execute.ScanPlan;
 +import org.apache.phoenix.execute.TupleProjector;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.expression.LiteralExpression;
 +import org.apache.phoenix.iterate.BaseResultIterators;
 +import org.apache.phoenix.iterate.ParallelIteratorFactory;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.parse.SelectStatement;
 +import org.apache.phoenix.query.QueryServices;
 +import org.apache.phoenix.query.QueryServicesOptions;
 +import org.apache.phoenix.schema.PColumn;
 +import org.apache.phoenix.schema.PName;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.types.PDataType;
 +import com.google.common.base.Supplier;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.ImmutableList.Builder;
 +
 +/**
 + * Scan of a Phoenix table.
 + */
 +public class PhoenixTableScan extends TableScan implements PhoenixRel {
 +    public enum ScanOrder {
 +        NONE,
 +        FORWARD,
 +        REVERSE,
 +    }
 +    
 +    public final RexNode filter;
 +    public final ScanOrder scanOrder;
 +    public final ScanRanges scanRanges;
 +    public final ImmutableBitSet extendedColumnRef;
 +    
 +    protected final Long estimatedBytes;
 +    protected final float rowCountFactor;
 +    
 +    public static PhoenixTableScan create(RelOptCluster cluster, final 
RelOptTable table) {
 +        return create(cluster, table, null,
 +                getDefaultScanOrder(table.unwrap(PhoenixTable.class)), null);
 +    }
 +
 +    public static PhoenixTableScan create(RelOptCluster cluster, final 
RelOptTable table, 
 +            RexNode filter, final ScanOrder scanOrder, ImmutableBitSet 
extendedColumnRef) {
 +        final RelTraitSet traits =
 +                cluster.traitSetOf(PhoenixConvention.SERVER)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        if (scanOrder == ScanOrder.NONE) {
 +                            return ImmutableList.of();
 +                        }
 +                        List<RelCollation> collations = 
table.getCollationList();
 +                        return scanOrder == ScanOrder.FORWARD ? collations : 
reverse(collations);
 +                    }
 +                });
 +        return new PhoenixTableScan(cluster, traits, table, filter, 
scanOrder, extendedColumnRef);
 +    }
 +
 +    private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits,
 +            RelOptTable table, RexNode filter, ScanOrder scanOrder,
 +            ImmutableBitSet extendedColumnRef) {
 +        super(cluster, traits, table);
 +        this.filter = filter;
 +        this.scanOrder = scanOrder;
 +        final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
 +        this.rowCountFactor = phoenixTable.pc.getQueryServices()
 +                .getProps().getFloat(PhoenixRel.ROW_COUNT_FACTOR, 1f);
 +        try {
 +            // TODO simplify this code
 +            TableMapping tableMapping = phoenixTable.tableMapping;
 +            PTable pTable = tableMapping.getPTable();
 +            SelectStatement select = SelectStatement.SELECT_ONE;
 +            PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
 +            ColumnResolver resolver = 
FromCompiler.getResolver(tableMapping.getTableRef());
 +            StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
 +            if (extendedColumnRef == null) {
 +                extendedColumnRef = 
tableMapping.getDefaultExtendedColumnRef();
 +            }
 +            if (filter == null) {
 +                this.extendedColumnRef = extendedColumnRef;
 +            } else {
 +                this.extendedColumnRef = extendedColumnRef.union(
 +                        
tableMapping.getExtendedColumnRef(ImmutableList.of(filter)));
 +                // We use a implementor with a special implementation for 
field access
 +                // here, which translates RexFieldAccess into a 
LiteralExpression
 +                // with a sample value. This will achieve 3 goals at a time:
 +                // 1) avoid getting exception when translating RexFieldAccess 
at this 
 +                //    time when the correlate variable has not been defined 
yet.
 +                // 2) get a guess of ScanRange even if the runtime value is 
absent.
 +                // 3) test whether this dynamic filter is worth a recompile 
at runtime.
 +                Implementor tmpImplementor = new 
PhoenixRelImplementorImpl(null) {                    
 +                    @SuppressWarnings("rawtypes")
 +                    @Override
 +                    public Expression newFieldAccessExpression(String 
variableId, int index, PDataType type) {
 +                        try {
 +                            return 
LiteralExpression.newConstant(type.getSampleValue(), type);
 +                        } catch (SQLException e) {
 +                            throw new RuntimeException(e);
 +                        }
 +                    }                    
 +                };
 +                tmpImplementor.setTableMapping(tableMapping);
 +                Expression filterExpr = CalciteUtils.toExpression(filter, 
tmpImplementor);
 +                filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, 
select, filterExpr);
 +                WhereCompiler.setScanFilter(context, select, filterExpr, 
true, false);
 +            }        
 +            this.scanRanges = context.getScanRanges();
 +            // TODO Get estimated byte count based on column reference list.
 +            this.estimatedBytes = 
BaseResultIterators.getEstimatedCount(context, pTable).getSecond();
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
 +    private static ScanOrder getDefaultScanOrder(PhoenixTable table) {
 +        //TODO why attribute value not correct in connectUsingModel??
 +        //return table.pc.getQueryServices().getProps().getBoolean(
 +        //        QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB,
 +        //        QueryServicesOptions.DEFAULT_FORCE_ROW_KEY_ORDER) ?
 +        //                ScanOrder.FORWARD : ScanOrder.NONE;
 +        return ScanOrder.NONE;
 +    }
 +    
 +    private static List<RelCollation> reverse(List<RelCollation> collations) {
 +        Builder<RelCollation> builder = ImmutableList.<RelCollation>builder();
 +        for (RelCollation collation : collations) {
 +            builder.add(CalciteUtils.reverseCollation(collation));
 +        }
 +        return builder.build();
 +    }
 +    
 +    public boolean isReverseScanEnabled() {
 +        return table.unwrap(PhoenixTable.class).pc
 +                .getQueryServices().getProps().getBoolean(
 +                        QueryServices.USE_REVERSE_SCAN_ATTRIB,
 +                        QueryServicesOptions.DEFAULT_USE_REVERSE_SCAN);
 +    }
 +
 +    @Override
 +    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
 +        assert inputs.isEmpty();
 +        return this;
 +    }
 +
 +    @Override
 +    public RelWriter explainTerms(RelWriter pw) {
 +        return super.explainTerms(pw)
 +            .itemIf("filter", filter, filter != null)
 +            .itemIf("scanOrder", scanOrder, scanOrder != ScanOrder.NONE)
 +            .itemIf("extendedColumns", extendedColumnRef, 
!extendedColumnRef.isEmpty());
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        double byteCount;
 +        PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
 +        if (estimatedBytes != null) {
 +            byteCount = estimatedBytes;
 +        } else {
 +            // If stats are not available, we estimate based on selectivity.
 +            int pkCount = scanRanges.getBoundPkColumnCount();
 +            if (pkCount > 0) {
 +                byteCount = phoenixTable.byteCount * 
Math.pow(mq.getSelectivity(this, filter), pkCount);
 +            } else {
 +                byteCount = phoenixTable.byteCount;
 +            }
 +        }
 +        Pair<Integer, Integer> columnRefCount =
 +                
phoenixTable.tableMapping.getExtendedColumnReferenceCount(extendedColumnRef);
 +        double extendedColumnMultiplier = 1 + columnRefCount.getFirst() * 10 
+ columnRefCount.getSecond() * 0.1;
 +        byteCount *= extendedColumnMultiplier;
 +        byteCount *= rowCountFactor;
 +        if (scanOrder != ScanOrder.NONE) {
 +            // We don't want to make a big difference here. The idea is to 
avoid
 +            // forcing row key order whenever the order is absolutely useless.
 +            // E.g. in "select count(*) from t" we do not need the row key 
order;
 +            // while in "select * from t order by pk0" we should force row key
 +            // order to avoid sorting.
 +            // Another case is "select pk0, count(*) from t", where we'd like 
to
 +            // choose the row key ordered TableScan rel so that the Aggregate 
rel
 +            // above it can be an stream aggregate, although at runtime this 
will
 +            // eventually be an AggregatePlan, in which the "forceRowKeyOrder"
 +            // flag takes no effect.
 +            byteCount = addEpsilon(byteCount);
 +            if (scanOrder == ScanOrder.REVERSE) {
 +                byteCount = addEpsilon(byteCount);
 +            }
 +        }
 +        return planner.getCostFactory()
 +                .makeCost(byteCount + 1, byteCount + 1, 0)
 +                .multiplyBy(0.5) /* data scan only */
 +                .multiplyBy(SERVER_FACTOR)
 +                .multiplyBy(PHOENIX_FACTOR);
 +    }
 +    
 +    @Override
 +    public double estimateRowCount(RelMetadataQuery mq) {
 +        double rows = super.estimateRowCount(mq);
 +        if (filter != null && !filter.isAlwaysTrue()) {
 +            rows = rows * mq.getSelectivity(this, filter);
 +        }
 +        
 +        return rows * rowCountFactor;
 +    }
 +    
 +    @Override
 +    public List<RelCollation> getCollationList() {
 +        return getTraitSet().getTraits(RelCollationTraitDef.INSTANCE);        
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
 +        TableMapping tableMapping = phoenixTable.tableMapping;
 +        implementor.setTableMapping(tableMapping);
 +        try {
 +            PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
 +            ColumnResolver resolver = 
FromCompiler.getResolver(tableMapping.getTableRef());
 +            StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
 +            SelectStatement select = SelectStatement.SELECT_ONE;
 +            ImmutableIntList columnRefList = 
implementor.getCurrentContext().columnRefList;
 +            Expression filterExpr = 
LiteralExpression.newConstant(Boolean.TRUE);
 +            Expression dynamicFilter = null;
 +            if (filter != null) {
 +                ImmutableBitSet bitSet = 
InputFinder.analyze(filter).inputBitSet.addAll(columnRefList).build();
 +                columnRefList = ImmutableIntList.copyOf(bitSet.asList());
 +                filterExpr = CalciteUtils.toExpression(filter, implementor);
 +            }
 +            Expression rem = WhereOptimizer.pushKeyExpressionsToScan(context, 
select, filterExpr);
 +            WhereCompiler.setScanFilter(context, select, rem, true, false);
 +            // TODO This is not absolutely strict. We may have a filter like:
 +            // pk = '0' and pk = $cor0 where $cor0 happens to get a sample 
value
 +            // as '0', thus making the below test return false and adding an
 +            // unnecessary dynamic filter. This would only be a performance 
bug though.
 +            if (filter != null && 
!context.getScanRanges().equals(this.scanRanges)) {
 +                dynamicFilter = filterExpr;
 +            }
 +            tableMapping.setupScanForExtendedTable(context.getScan(), 
extendedColumnRef, context.getConnection());
 +            projectColumnFamilies(context.getScan(), 
tableMapping.getMappedColumns(), columnRefList);
 +            if (implementor.getCurrentContext().forceProject) {
 +                boolean retainPKColumns = 
implementor.getCurrentContext().retainPKColumns;
 +                TupleProjector tupleProjector = 
tableMapping.createTupleProjector(retainPKColumns);
 +                TupleProjector.serializeProjectorIntoScan(context.getScan(), 
tupleProjector);
 +                PTable projectedTable = 
tableMapping.createProjectedTable(retainPKColumns);
 +                implementor.setTableMapping(new TableMapping(projectedTable));
 +            }
-             Integer limit = null;
 +            OrderBy orderBy = scanOrder == ScanOrder.NONE ?
 +                      OrderBy.EMPTY_ORDER_BY
 +                    : (scanOrder == ScanOrder.FORWARD ?
 +                              OrderBy.FWD_ROW_KEY_ORDER_BY
 +                            : OrderBy.REV_ROW_KEY_ORDER_BY);
 +            ParallelIteratorFactory iteratorFactory = null;
-             return new ScanPlan(context, select, tableMapping.getTableRef(), 
RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, 
dynamicFilter);
++            return new ScanPlan(context, select, tableMapping.getTableRef(), 
RowProjector.EMPTY_PROJECTOR, null, null, orderBy, iteratorFactory, true, 
dynamicFilter);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
 +    private void projectColumnFamilies(Scan scan, List<PColumn> 
mappedColumns, ImmutableIntList columnRefList) {
 +        scan.getFamilyMap().clear();
 +        for (Integer index : columnRefList) {
 +            PColumn column = mappedColumns.get(index);
 +            PName familyName = column.getFamilyName();
 +            if (familyName != null) {
 +                scan.addFamily(familyName.getBytes());
 +            }
 +        }
 +    }
 +
 +    private double addEpsilon(double d) {
 +      assert d >= 0d;
 +      final double d0 = d;
 +      if (d < 10) {
 +        // For small d, adding 1 would change the value significantly.
 +        d *= 1.001d;
 +        if (d != d0) {
 +          return d;
 +        }
 +      }
 +      // For medium d, add 1. Keeps integral values integral.
 +      ++d;
 +      if (d != d0) {
 +        return d;
 +      }
 +      // For large d, adding 1 might not change the value. Add .1%.
 +      // If d is NaN, this still will probably not change the value. That's 
OK.
 +      d *= 1.001d;
 +      return d;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
index 70bc71c,0000000..57393c8
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
@@@ -1,64 -1,0 +1,64 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.util.List;
 +
 +import org.apache.calcite.linq4j.Ord;
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.Union;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.execute.UnionPlan;
 +import org.apache.phoenix.parse.SelectStatement;
 +import com.google.common.collect.Lists;
 +
 +/**
 + * Implementation of {@link org.apache.calcite.rel.core.Union}
 + * relational expression in Phoenix.
 + */
 +public class PhoenixUnion extends Union implements PhoenixRel {
 +    
 +    public static PhoenixUnion create(List<RelNode> inputs, boolean all) {
 +        RelOptCluster cluster = inputs.get(0).getCluster();
 +        RelTraitSet traits = cluster.traitSetOf(PhoenixConvention.CLIENT);
 +        return new PhoenixUnion(cluster, traits, inputs, all);
 +    }
 +    
 +    private PhoenixUnion(RelOptCluster cluster, RelTraitSet traits, 
List<RelNode> inputs, boolean all) {
 +        super(cluster, traits, inputs, all);
 +    }
 +
 +    @Override
 +    public PhoenixUnion copy(RelTraitSet traits, List<RelNode> inputs, 
boolean all) {
 +        return create(inputs, all);
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        for (RelNode input : getInputs()) {
 +            if (!input.getConvention().satisfies(PhoenixConvention.GENERIC)) {
 +                return planner.getCostFactory().makeInfiniteCost();
 +            }
 +        }
 +        
 +        return super.computeSelfCost(planner, mq)
 +                .multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        List<QueryPlan> subPlans = 
Lists.newArrayListWithExpectedSize(inputs.size());
 +        for (Ord<RelNode> input : Ord.zip(inputs)) {
 +            subPlans.add(implementor.visitInput(input.i, (PhoenixRel) 
input.e));
 +        }
 +        
 +        return new UnionPlan(subPlans.get(0).getContext(), 
SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(), 
RowProjector.EMPTY_PROJECTOR,
-                 null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, 
subPlans, null);
++                null, null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, 
subPlans, null);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
index 83947ae,0000000..c0fd272
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
@@@ -1,136 -1,0 +1,136 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
 +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 +
 +import java.sql.Connection;
 +import java.sql.DriverManager;
 +import java.sql.SQLException;
 +import java.util.Iterator;
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelDistribution;
 +import org.apache.calcite.rel.RelDistributionTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.Values;
 +import org.apache.calcite.rel.metadata.RelMdCollation;
 +import org.apache.calcite.rel.metadata.RelMdDistribution;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.rex.RexLiteral;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.calcite.CalciteUtils;
 +import org.apache.phoenix.calcite.TableMapping;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.ColumnResolver;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.execute.LiteralResultIterationPlan;
 +import org.apache.phoenix.execute.TupleProjector;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.jdbc.PhoenixConnection;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.parse.SelectStatement;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.TableRef;
 +import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 +import org.apache.phoenix.schema.tuple.Tuple;
 +
 +import com.google.common.base.Supplier;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.Lists;
 +
 +/**
 + * Implementation of {@link org.apache.calcite.rel.core.Values}
 + * relational expression in Phoenix.
 + */
 +public class PhoenixValues extends Values implements PhoenixRel {
 +    
 +    private static final PhoenixConnection phoenixConnection;
 +    static {
 +        try {
 +            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
 +            final Connection connection =
 +                DriverManager.getConnection(JDBC_PROTOCOL + 
JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS);
 +            phoenixConnection =
 +                connection.unwrap(PhoenixConnection.class);
 +        } catch (ClassNotFoundException e) {
 +            throw new RuntimeException(e);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
 +    public static PhoenixValues create(RelOptCluster cluster, final 
RelDataType rowType, final ImmutableList<ImmutableList<RexLiteral>> tuples) {
 +        final RelMetadataQuery mq = RelMetadataQuery.instance();
 +        final RelTraitSet traits =
 +                cluster.traitSetOf(PhoenixConvention.CLIENT)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        return RelMdCollation.values(mq, rowType, tuples);
 +                    }
 +                })
 +                .replaceIf(RelDistributionTraitDef.INSTANCE,
 +                        new Supplier<RelDistribution>() {
 +                    public RelDistribution get() {
 +                        return RelMdDistribution.values(rowType, tuples);
 +                    }
 +                });
 +        return new PhoenixValues(cluster, rowType, tuples, traits);
 +    }
 +    
 +    private PhoenixValues(RelOptCluster cluster, RelDataType rowType, 
ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) {
 +        super(cluster, rowType, tuples, traits);
 +    }
 +
 +    @Override
 +    public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) {
 +        assert inputs.isEmpty();
 +        return create(getCluster(), rowType, tuples);
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
 +        return super.computeSelfCost(planner, mq).multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        List<Tuple> literalResult = Lists.newArrayList();
 +        Iterator<ImmutableList<RexLiteral>> iter = getTuples().iterator();
 +        Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
 +        while (iter.hasNext()) {
 +            ImmutableList<RexLiteral> row = iter.next();
 +            List<Expression> exprs = 
Lists.newArrayListWithExpectedSize(row.size());
 +            for (RexLiteral rexLiteral : row) {
 +                exprs.add(CalciteUtils.toExpression(rexLiteral, implementor));
 +            }
 +            TupleProjector projector = implementor.project(exprs);
 +            literalResult.add(projector.projectResults(baseTuple));
 +        }
 +        PTable projectedTable = 
implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns);
 +        TableMapping tableMapping = new TableMapping(projectedTable);
 +        implementor.setTableMapping(tableMapping);
 +        
 +        try {
 +            PhoenixStatement stmt = new PhoenixStatement(phoenixConnection);
 +            ColumnResolver resolver = 
FromCompiler.getResolver(tableMapping.getTableRef());
 +            StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
-             return new LiteralResultIterationPlan(literalResult, context, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
++            return new LiteralResultIterationPlan(literalResult, context, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, null, OrderBy.EMPTY_ORDER_BY, null);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 794247b,b125ecc..e5ea255
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@@ -77,23 -76,20 +77,24 @@@ public class AggregatePlan extends Base
      private final Expression having;
      private List<KeyRange> splits;
      private List<List<Scan>> scans;
 +    
 +    public static AggregatePlan create(AggregatePlan plan, OrderBy 
newOrderBy) {
-         return new AggregatePlan(plan.getContext(), plan.getStatement(), 
plan.getTableRef(), plan.getProjector(), null, newOrderBy, 
plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving(), 
plan.dynamicFilter);
++        return new AggregatePlan(plan.getContext(), plan.getStatement(), 
plan.getTableRef(), plan.getProjector(), null, null, newOrderBy, 
plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving(), 
plan.dynamicFilter);
 +    }
  
-     public AggregatePlan(
-             StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
-             Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, GroupBy groupBy,
-             Expression having) {
-         this(context, statement, table, projector, limit, orderBy, 
parallelIteratorFactory, groupBy, having, null);
+     public AggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table,
+             RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy,
+             ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, 
Expression having) {
+         this(context, statement, table, projector, limit, offset, orderBy, 
parallelIteratorFactory, groupBy, having,
+                 null);
      }
-     
-     public AggregatePlan(
-             StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
-             Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, GroupBy groupBy,
-             Expression having, Expression dynamicFilter) {
-         super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, groupBy, 
parallelIteratorFactory, dynamicFilter);
+ 
 -    private AggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table,
++    public AggregatePlan(StatementContext context, FilterableStatement 
statement, TableRef table,
+             RowProjector projector, Integer limit, Integer offset, OrderBy 
orderBy,
+             ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, 
Expression having,
+             Expression dynamicFilter) {
+         super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, offset,
+                 orderBy, groupBy, parallelIteratorFactory, dynamicFilter);
          this.having = having;
          this.aggregators = context.getAggregationManager().getAggregators();
      }
@@@ -245,13 -251,4 +254,13 @@@
      public boolean useRoundRobinIterator() throws SQLException {
          return false;
      }
 +
 +    @Override
 +    public QueryPlan limit(Integer limit) {
 +        if (limit == this.limit || (limit != null && 
limit.equals(this.limit)))
 +            return this;
 +        
 +        return new AggregatePlan(this.context, this.statement, this.tableRef, 
this.projection,
-             limit, this.orderBy, this.parallelIteratorFactory, this.groupBy, 
this.having, this.dynamicFilter);
++            limit, this.offset, this.orderBy, this.parallelIteratorFactory, 
this.groupBy, this.having, this.dynamicFilter);
 +    }
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e0dc286a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------

Reply via email to