Fix for merge

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8c19e1c1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8c19e1c1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8c19e1c1

Branch: refs/heads/calcite
Commit: 8c19e1c13f2c59c4ff8a974cf351e5a3faae3eab
Parents: 7167262 2acb38a
Author: maryannxue <wei....@intel.com>
Authored: Thu Sep 10 09:58:30 2015 -0400
Committer: maryannxue <wei....@intel.com>
Committed: Thu Sep 10 09:58:30 2015 -0400

----------------------------------------------------------------------
 .../phoenix/end2end/RowValueConstructorIT.java  |  69 ++++++
 .../org/apache/phoenix/end2end/SortOrderIT.java |  27 ++
 .../java/org/apache/phoenix/end2end/ViewIT.java |  24 ++
 .../iterate/MockParallelIteratorFactory.java    |  47 ++++
 .../iterate/MockTableResultIterator.java        |  66 +++++
 .../iterate/RoundRobinResultIteratorIT.java     |  40 ++-
 .../RoundRobinResultIteratorWithStatsIT.java    | 104 ++++++++
 .../phoenix/calcite/rel/PhoenixTableScan.java   |   2 +-
 .../phoenix/calcite/rel/PhoenixValues.java      |   2 +-
 .../phoenix/compile/OrderPreservingTracker.java |  21 +-
 .../org/apache/phoenix/compile/ScanRanges.java  | 136 +++++++---
 .../apache/phoenix/compile/WhereOptimizer.java  |  72 +++---
 .../coprocessor/MetaDataEndpointImpl.java       |   7 +-
 .../apache/phoenix/execute/AggregatePlan.java   |  13 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  22 +-
 .../apache/phoenix/execute/CorrelatePlan.java   | 218 ++++++++++++++++
 .../phoenix/execute/DegenerateQueryPlan.java    |   2 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |   9 +-
 .../execute/LiteralResultIterationPlan.java     |  13 +-
 .../apache/phoenix/execute/RuntimeContext.java  |  33 +++
 .../phoenix/execute/RuntimeContextImpl.java     |  86 +++++++
 .../org/apache/phoenix/execute/ScanPlan.java    |  25 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |   5 +-
 .../apache/phoenix/execute/UnnestArrayPlan.java |   8 +-
 .../CorrelateVariableFieldAccessExpression.java |  75 ++++++
 .../phoenix/expression/ExpressionType.java      |  11 +-
 .../visitor/CloneExpressionVisitor.java         |   6 +
 .../expression/visitor/ExpressionVisitor.java   |   2 +
 .../StatelessTraverseAllExpressionVisitor.java  |   7 +
 .../StatelessTraverseNoExpressionVisitor.java   |   7 +
 .../apache/phoenix/filter/SkipScanFilter.java   |  28 ++-
 .../phoenix/index/PhoenixIndexBuilder.java      |   5 +-
 .../phoenix/iterate/BaseResultIterators.java    |  14 +-
 .../apache/phoenix/iterate/ExplainTable.java    |   7 +-
 .../UngroupedAggregatingResultIterator.java     |   3 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  16 ++
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |   6 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   3 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |   6 +-
 .../org/apache/phoenix/util/KeyValueUtil.java   |  91 ++-----
 .../java/org/apache/phoenix/util/ScanUtil.java  |  49 ++--
 .../phoenix/compile/QueryCompilerTest.java      |  18 +-
 .../compile/ScanRangesIntersectTest.java        |  37 +--
 .../apache/phoenix/compile/ScanRangesTest.java  |   3 +-
 .../TenantSpecificViewIndexCompileTest.java     | 172 +++++++++++++
 .../phoenix/compile/ViewCompilerTest.java       |   1 -
 .../phoenix/compile/WhereOptimizerTest.java     |  22 +-
 .../phoenix/execute/CorrelatePlanTest.java      | 248 +++++++++++++++++++
 .../phoenix/execute/UnnestArrayPlanTest.java    |   6 +-
 .../query/ParallelIteratorsSplitTest.java       |   3 +-
 .../org/apache/phoenix/query/QueryPlanTest.java |  10 +-
 .../org/apache/phoenix/util/ScanUtilTest.java   |  10 +-
 52 files changed, 1617 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index b8e97ed,0000000..171dd26
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@@ -1,229 -1,0 +1,229 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.sql.SQLException;
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelOptTable;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.RelWriter;
 +import org.apache.calcite.rel.core.TableScan;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.hadoop.hbase.HConstants;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.calcite.CalciteUtils;
 +import org.apache.phoenix.calcite.PhoenixTable;
 +import org.apache.phoenix.compile.ColumnResolver;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.ScanRanges;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.compile.WhereCompiler;
 +import org.apache.phoenix.compile.WhereOptimizer;
 +import org.apache.phoenix.execute.ScanPlan;
 +import org.apache.phoenix.execute.TupleProjector;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.iterate.ParallelIteratorFactory;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.parse.SelectStatement;
 +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 +import org.apache.phoenix.schema.PColumn;
 +import org.apache.phoenix.schema.PColumnFamily;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.TableRef;
 +import org.apache.phoenix.util.SchemaUtil;
 +
 +import com.google.common.base.Supplier;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.Lists;
 +
 +/**
 + * Scan of a Phoenix table.
 + */
 +public class PhoenixTableScan extends TableScan implements PhoenixRel {
 +    public final RexNode filter;
 +    
 +    private final ScanRanges scanRanges;
 +    
 +    /**
 +     * This will not make a difference in implement(), but rather give a more 
accurate
 +     * estimate of the row count.
 +     */
 +    public final Integer statelessFetch;
 +    
 +    public static PhoenixTableScan create(RelOptCluster cluster, final 
RelOptTable table, 
 +            RexNode filter, Integer statelessFetch) {
 +        final RelTraitSet traits =
 +                cluster.traitSetOf(PhoenixRel.SERVER_CONVENTION)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        if (table != null) {
 +                            return 
table.unwrap(PhoenixTable.class).getStatistic().getCollations();
 +                        }
 +                        return ImmutableList.of();
 +                    }
 +                });
 +        return new PhoenixTableScan(cluster, traits, table, filter, 
statelessFetch);
 +    }
 +
 +    private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, 
RelOptTable table, RexNode filter, Integer statelessFetch) {
 +        super(cluster, traits, table);
 +        this.filter = filter;
 +        this.statelessFetch = statelessFetch;
 +        
 +        ScanRanges scanRanges = null;
 +        if (filter != null) {
 +            try {
 +                // TODO simplify this code
 +                final PhoenixTable phoenixTable = 
table.unwrap(PhoenixTable.class);
 +                PTable pTable = phoenixTable.getTable();
 +                TableRef tableRef = new 
TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, 
false);
 +                Implementor tmpImplementor = new PhoenixRelImplementorImpl();
 +                tmpImplementor.setTableRef(tableRef);
 +                SelectStatement select = SelectStatement.SELECT_ONE;
 +                PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
 +                ColumnResolver resolver = FromCompiler.getResolver(tableRef);
 +                StatementContext context = new StatementContext(stmt, 
resolver, new Scan(), new SequenceManager(stmt));
 +                Expression filterExpr = CalciteUtils.toExpression(filter, 
tmpImplementor);
 +                filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, 
select, filterExpr);
 +                scanRanges = context.getScanRanges();
 +            } catch (SQLException e) {
 +                throw new RuntimeException(e);
 +            }
 +        }        
 +        this.scanRanges = scanRanges;
 +    }
 +
 +    @Override
 +    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
 +        assert inputs.isEmpty();
 +        return this;
 +    }
 +
 +    @Override
 +    public RelWriter explainTerms(RelWriter pw) {
 +        return super.explainTerms(pw)
 +            .itemIf("filter", filter, filter != null)
 +            .itemIf("statelessFetch", statelessFetch, statelessFetch != null);
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner) {
 +        double rowCount = super.getRows();
 +        Double filteredRowCount = null;
 +        if (scanRanges != null) {
 +            if (scanRanges.isPointLookup()) {
 +                filteredRowCount = 1.0;
-             } else if (scanRanges.getPkColumnSpan() > 0) {
++            } else if (scanRanges.getBoundPkColumnCount() > 0) {
 +                // TODO
 +                filteredRowCount = rowCount * 
RelMetadataQuery.getSelectivity(this, filter);
 +            }
 +        }
 +        if (filteredRowCount != null) {
 +            rowCount = filteredRowCount;
 +        } else if 
(table.unwrap(PhoenixTable.class).getTable().getParentName() != null){
 +            rowCount = addEpsilon(rowCount);
 +        }
 +        int fieldCount = this.table.getRowType().getFieldCount();
 +        return planner.getCostFactory()
 +                .makeCost(rowCount * 2 * fieldCount / (fieldCount + 1), 
rowCount + 1, 0)
 +                .multiplyBy(PHOENIX_FACTOR);
 +    }
 +    
 +    @Override
 +    public double getRows() {
 +        double rows = super.getRows();
 +        if (filter != null && !filter.isAlwaysTrue()) {
 +            rows = rows * RelMetadataQuery.getSelectivity(this, filter);
 +        }        
 +        if (statelessFetch == null)
 +            return rows;
 +        
 +        return Math.min(statelessFetch, rows);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
 +        PTable pTable = phoenixTable.getTable();
 +        TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), 
pTable, HConstants.LATEST_TIMESTAMP, false);
 +        implementor.setTableRef(tableRef);
 +        try {
 +            PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
 +            ColumnResolver resolver = FromCompiler.getResolver(tableRef);
 +            StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
 +            SelectStatement select = SelectStatement.SELECT_ONE;
 +            if (filter != null) {
 +                Expression filterExpr = CalciteUtils.toExpression(filter, 
implementor);
 +                filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, 
select, filterExpr);
 +                WhereCompiler.setScanFilter(context, select, filterExpr, 
true, false);
 +            }
 +            projectAllColumnFamilies(context.getScan(), 
phoenixTable.getTable());
 +            if (implementor.getCurrentContext().forceProject()) {
 +                TupleProjector tupleProjector = 
createTupleProjector(implementor, phoenixTable.getTable());
 +                TupleProjector.serializeProjectorIntoScan(context.getScan(), 
tupleProjector);
 +                PTable projectedTable = implementor.createProjectedTable();
 +                implementor.setTableRef(new TableRef(projectedTable));
 +            }
 +            Integer limit = null;
 +            OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
 +            ParallelIteratorFactory iteratorFactory = null;
 +            return new ScanPlan(context, select, tableRef, 
RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
 +    private TupleProjector createTupleProjector(Implementor implementor, 
PTable table) {
 +        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
 +        List<Expression> exprs = Lists.<Expression> newArrayList();
 +        for (PColumn column : table.getColumns()) {
 +            if (!SchemaUtil.isPKColumn(column) || 
!implementor.getCurrentContext().isRetainPKColumns()) {
 +                Expression expr = 
implementor.newColumnExpression(column.getPosition());
 +                exprs.add(expr);
 +                builder.addField(expr);                
 +            }
 +        }
 +        
 +        return new TupleProjector(builder.build(), exprs.toArray(new 
Expression[exprs.size()]));
 +    }
 +    
 +    // TODO only project needed columns
 +    private void projectAllColumnFamilies(Scan scan, PTable table) {
 +        scan.getFamilyMap().clear();
 +        for (PColumnFamily family : table.getColumnFamilies()) {
 +            scan.addFamily(family.getName().getBytes());
 +        }
 +    }
 +
 +    private double addEpsilon(double d) {
 +      assert d >= 0d;
 +      final double d0 = d;
 +      if (d < 10) {
 +        // For small d, adding 1 would change the value significantly.
 +        d *= 1.001d;
 +        if (d != d0) {
 +          return d;
 +        }
 +      }
 +      // For medium d, add 1. Keeps integral values integral.
 +      ++d;
 +      if (d != d0) {
 +        return d;
 +      }
 +      // For large d, adding 1 might not change the value. Add .1%.
 +      // If d is NaN, this still will probably not change the value. That's 
OK.
 +      d *= 1.001d;
 +      return d;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
index b65b1b8,0000000..89aaa07
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
@@@ -1,129 -1,0 +1,129 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
 +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 +import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 +
 +import java.sql.Connection;
 +import java.sql.DriverManager;
 +import java.sql.SQLException;
 +import java.util.Iterator;
 +import java.util.List;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelCollation;
 +import org.apache.calcite.rel.RelCollationTraitDef;
 +import org.apache.calcite.rel.RelDistribution;
 +import org.apache.calcite.rel.RelDistributionTraitDef;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.Values;
 +import org.apache.calcite.rel.metadata.RelMdCollation;
 +import org.apache.calcite.rel.metadata.RelMdDistribution;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.rex.RexLiteral;
 +import org.apache.hadoop.hbase.KeyValue;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.calcite.CalciteUtils;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.ColumnResolver;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.execute.LiteralResultIterationPlan;
 +import org.apache.phoenix.execute.TupleProjector;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.jdbc.PhoenixConnection;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.parse.SelectStatement;
 +import org.apache.phoenix.schema.TableRef;
 +import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 +import org.apache.phoenix.schema.tuple.Tuple;
 +
 +import com.google.common.base.Supplier;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.Lists;
 +
 +/**
 + * Implementation of {@link org.apache.calcite.rel.core.Values}
 + * relational expression in Phoenix.
 + */
 +public class PhoenixValues extends Values implements PhoenixRel {
 +    
 +    private static final PhoenixConnection phoenixConnection;
 +    static {
 +        try {
 +            Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
 +            final Connection connection =
 +                DriverManager.getConnection(JDBC_PROTOCOL + 
JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS);
 +            phoenixConnection =
 +                connection.unwrap(PhoenixConnection.class);
 +        } catch (ClassNotFoundException e) {
 +            throw new RuntimeException(e);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
 +    public static PhoenixValues create(RelOptCluster cluster, final 
RelDataType rowType, final ImmutableList<ImmutableList<RexLiteral>> tuples) {
 +        final RelTraitSet traits =
 +                cluster.traitSetOf(PhoenixRel.CLIENT_CONVENTION)
 +                .replaceIfs(RelCollationTraitDef.INSTANCE,
 +                        new Supplier<List<RelCollation>>() {
 +                    public List<RelCollation> get() {
 +                        return RelMdCollation.values(rowType, tuples);
 +                    }
 +                })
 +                .replaceIf(RelDistributionTraitDef.INSTANCE,
 +                        new Supplier<RelDistribution>() {
 +                    public RelDistribution get() {
 +                        return RelMdDistribution.values(rowType, tuples);
 +                    }
 +                });
 +        return new PhoenixValues(cluster, rowType, tuples, traits);
 +    }
 +    
 +    private PhoenixValues(RelOptCluster cluster, RelDataType rowType, 
ImmutableList<ImmutableList<RexLiteral>> tuples, RelTraitSet traits) {
 +        super(cluster, rowType, tuples, traits);
 +    }
 +
 +    @Override
 +    public PhoenixValues copy(RelTraitSet traitSet, List<RelNode> inputs) {
 +        assert inputs.isEmpty();
 +        return create(getCluster(), rowType, tuples);
 +    }
 +
 +    @Override
 +    public RelOptCost computeSelfCost(RelOptPlanner planner) {
 +        return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
 +    }
 +
 +    @Override
 +    public QueryPlan implement(Implementor implementor) {
 +        List<Tuple> literalResult = Lists.newArrayList();
 +        Iterator<ImmutableList<RexLiteral>> iter = getTuples().iterator();
 +        Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
 +        while (iter.hasNext()) {
 +            ImmutableList<RexLiteral> row = iter.next();
 +            List<Expression> exprs = 
Lists.newArrayListWithExpectedSize(row.size());
 +            for (RexLiteral rexLiteral : row) {
 +                exprs.add(CalciteUtils.toExpression(rexLiteral, implementor));
 +            }
 +            TupleProjector projector = implementor.project(exprs);
 +            literalResult.add(projector.projectResults(baseTuple));
 +        }
 +        
 +        try {
 +            PhoenixStatement stmt = new PhoenixStatement(phoenixConnection);
 +            ColumnResolver resolver = 
FromCompiler.getResolver(implementor.getTableRef());
 +            StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
-             return new LiteralResultIterationPlan(literalResult.iterator(), 
context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
++            return new LiteralResultIterationPlan(literalResult, context, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 598ead2,9a415b9..da55fb5
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@@ -73,10 -72,6 +73,10 @@@ public class AggregatePlan extends Base
      private final Expression having;
      private List<KeyRange> splits;
      private List<List<Scan>> scans;
 +    
 +    public static AggregatePlan create(AggregatePlan plan, OrderBy 
newOrderBy) {
-         return new AggregatePlan(plan.getContext(), plan.getStatement(), 
plan.getTableRef(), plan.getProjector(), null, newOrderBy, 
plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving());
++        return new AggregatePlan(plan.getContext(), plan.getStatement(), 
plan.getTableRef(), plan.getProjector(), null, newOrderBy, 
plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving(), 
plan.dynamicFilter);
 +    }
  
      public AggregatePlan(
              StatementContext context, FilterableStatement statement, TableRef 
table, RowProjector projector,
@@@ -221,13 -223,4 +228,13 @@@
      public boolean useRoundRobinIterator() throws SQLException {
          return false;
      }
 +
 +    @Override
 +    public QueryPlan limit(Integer limit) {
 +        if (limit == this.limit || (limit != null && 
limit.equals(this.limit)))
 +            return this;
 +        
 +        return new AggregatePlan(this.context, this.statement, this.tableRef, 
this.projection,
-             limit, this.orderBy, this.parallelIteratorFactory, this.groupBy, 
this.having);
++            limit, this.orderBy, this.parallelIteratorFactory, this.groupBy, 
this.having, this.dynamicFilter);
 +    }
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 0000000,1b0af8c..1981c4b
mode 000000,100644..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@@ -1,0 -1,208 +1,218 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.phoenix.execute;
+ 
+ import java.io.IOException;
+ import java.sql.SQLException;
+ import java.util.List;
+ 
+ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.phoenix.compile.ExplainPlan;
++import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+ import org.apache.phoenix.compile.QueryPlan;
+ import org.apache.phoenix.exception.SQLExceptionCode;
+ import org.apache.phoenix.exception.SQLExceptionInfo;
+ import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+ import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+ import org.apache.phoenix.iterate.ParallelScanGrouper;
+ import org.apache.phoenix.iterate.ResultIterator;
+ import org.apache.phoenix.parse.JoinTableNode.JoinType;
+ import org.apache.phoenix.schema.KeyValueSchema;
+ import org.apache.phoenix.schema.PColumn;
+ import org.apache.phoenix.schema.PTable;
+ import org.apache.phoenix.schema.ValueBitSet;
+ import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+ import org.apache.phoenix.schema.tuple.Tuple;
+ import org.apache.phoenix.util.SchemaUtil;
+ 
+ import com.google.common.collect.Lists;
+ 
+ public class CorrelatePlan extends DelegateQueryPlan {    
+     private final QueryPlan rhs;
+     private final String variableId;
+     private final JoinType joinType;
+     private final boolean isSingleValueOnly;
+     private final RuntimeContext runtimeContext;
+     private final KeyValueSchema joinedSchema;
+     private final KeyValueSchema lhsSchema;
+     private final KeyValueSchema rhsSchema;
+     private final int rhsFieldPosition;
+ 
+     public CorrelatePlan(QueryPlan lhs, QueryPlan rhs, String variableId, 
+             JoinType joinType, boolean isSingleValueOnly, 
+             RuntimeContext runtimeContext, PTable joinedTable, 
+             PTable lhsTable, PTable rhsTable, int rhsFieldPosition) {
+         super(lhs);
+         if (joinType != JoinType.Inner && joinType != JoinType.Left && 
joinType != JoinType.Semi && joinType != JoinType.Anti)
+             throw new IllegalArgumentException("Unsupported join type '" + 
joinType + "' by CorrelatePlan");
+         
+         this.rhs = rhs;
+         this.variableId = variableId;
+         this.joinType = joinType;
+         this.isSingleValueOnly = isSingleValueOnly;
+         this.runtimeContext = runtimeContext;
+         this.joinedSchema = buildSchema(joinedTable);
+         this.lhsSchema = buildSchema(lhsTable);
+         this.rhsSchema = buildSchema(rhsTable);
+         this.rhsFieldPosition = rhsFieldPosition;
+     }
+ 
+     private static KeyValueSchema buildSchema(PTable table) {
+         KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+         if (table != null) {
+             for (PColumn column : table.getColumns()) {
+                 if (!SchemaUtil.isPKColumn(column)) {
+                     builder.addField(column);
+                 }
+             }
+         }
+         return builder.build();
+     }
+ 
+     @Override
+     public ExplainPlan getExplainPlan() throws SQLException {
+         List<String> steps = Lists.newArrayList();
+         steps.add("NESTED-LOOP-JOIN (" + joinType.toString().toUpperCase() + 
") TABLES");
+         for (String step : delegate.getExplainPlan().getPlanSteps()) {
+             steps.add("    " + step);            
+         }
+         steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : 
""));
+         for (String step : rhs.getExplainPlan().getPlanSteps()) {
+             steps.add("    " + step);            
+         }
+         return new ExplainPlan(steps);
+     }
+ 
+     @Override
+     public ResultIterator iterator() throws SQLException {
+         return iterator(DefaultParallelScanGrouper.getInstance());
+     }
+ 
+     @Override
+     public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+             throws SQLException {
+         return new ResultIterator() {
+             private final ValueBitSet destBitSet = 
ValueBitSet.newInstance(joinedSchema);
+             private final ValueBitSet lhsBitSet = 
ValueBitSet.newInstance(lhsSchema);
+             private final ValueBitSet rhsBitSet = 
+                     (joinType == JoinType.Semi || joinType == JoinType.Anti) ?
+                             ValueBitSet.EMPTY_VALUE_BITSET 
+                           : ValueBitSet.newInstance(rhsSchema);
+             private final ResultIterator iter = delegate.iterator();
+             private ResultIterator rhsIter = null;
+             private Tuple current = null;
+             private boolean closed = false;
+ 
+             @Override
+             public void close() throws SQLException {
+                 if (!closed) {
+                     closed = true;
+                     iter.close();
+                     if (rhsIter != null) {
+                         rhsIter.close();
+                     }
+                 }
+             }
+ 
+             @Override
+             public Tuple next() throws SQLException {
+                 if (closed)
+                     return null;
+                 
+                 Tuple rhsCurrent = null;
+                 if (rhsIter != null) {
+                     rhsCurrent = rhsIter.next();
+                     if (rhsCurrent == null) {
+                         rhsIter.close();
+                         rhsIter = null;
+                     } else if (isSingleValueOnly) {
+                         throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS).build().buildException();
+                     }
+                 }
+                 while (rhsIter == null) {
+                     current = iter.next();
+                     if (current == null) {
+                         close();
+                         return null;
+                     }
+                     runtimeContext.setCorrelateVariableValue(variableId, 
current);
+                     rhsIter = rhs.iterator();
+                     rhsCurrent = rhsIter.next();
+                     if ((rhsCurrent == null && (joinType == JoinType.Inner || 
joinType == JoinType.Semi))
+                             || (rhsCurrent != null && joinType == 
JoinType.Anti)) {
+                         rhsIter.close();
+                         rhsIter = null;
+                     }
+                 }
+                 
+                 Tuple joined;
+                 try {
+                     joined = rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ?
+                             current : TupleProjector.mergeProjectedValue(
+                                     convertLhs(current), joinedSchema, 
destBitSet,
+                                     rhsCurrent, rhsSchema, rhsBitSet, 
rhsFieldPosition);
+                 } catch (IOException e) {
+                     throw new SQLException(e);
+                 }
+                                 
+                 if ((joinType == JoinType.Semi || rhsCurrent == null) && 
rhsIter != null) {
+                     rhsIter.close();
+                     rhsIter = null;
+                 }
+                 
+                 return joined;
+             }
+ 
+             @Override
+             public void explain(List<String> planSteps) {
+             }
+             
+             private ProjectedValueTuple convertLhs(Tuple lhs) throws 
IOException {
+                 ProjectedValueTuple t;
+                 if (lhs instanceof ProjectedValueTuple) {
+                     t = (ProjectedValueTuple) lhs;
+                 } else {
+                     ImmutableBytesWritable ptr = getContext().getTempPtr();
+                     TupleProjector.decodeProjectedValue(lhs, ptr);
+                     lhsBitSet.clear();
+                     lhsBitSet.or(ptr);
+                     int bitSetLen = lhsBitSet.getEstimatedLength();
+                     t = new ProjectedValueTuple(lhs, 
lhs.getValue(0).getTimestamp(), 
+                             ptr.get(), ptr.getOffset(), ptr.getLength(), 
bitSetLen);
+ 
+                 }
+                 return t;
+             }
+         };
+     }
+ 
+     @Override
+     public Integer getLimit() {
+         return null;
+     }
+ 
++    @Override
++    public QueryPlan limit(Integer limit) {
++        if (limit == null)
++            return this;
++        
++        return new ClientScanPlan(this.getContext(), this.getStatement(), 
this.getTableRef(),
++                this.getProjector(), limit, null, OrderBy.EMPTY_ORDER_BY, 
this);
++    }
++
+ }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 8cf39ba,72920b2..bbac5a5
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@@ -113,16 -114,10 +113,18 @@@ public class HashJoinPlan extends Deleg
          this.joinInfo = joinInfo;
          this.subPlans = subPlans;
          this.recompileWhereClause = recompileWhereClause;
+         this.maxServerCacheTimeToLive = 
plan.getContext().getConnection().getQueryServices().getProps().getInt(
+                 QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
      }
      
 +    public HashJoinInfo getJoinInfo() {
 +        return this.joinInfo;
 +    }
 +    
 +    public SubPlan[] getSubPlans() {
 +        return this.subPlans;
 +    }
 +
      @Override
      public ResultIterator iterator() throws SQLException {
        return iterator(DefaultParallelScanGrouper.getInstance());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 58c78d2,ab13e6c..e9fac80
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@@ -106,13 -106,4 +107,13 @@@ public class LiteralResultIterationPla
          return scanner;
      }
  
 +    @Override
 +    public QueryPlan limit(Integer limit) {
 +        if (limit == this.limit || (limit != null && 
limit.equals(this.limit)))
 +            return this;
 +        
-         return new LiteralResultIterationPlan(this.tupleIterator, 
this.context, this.statement, this.tableRef, 
++        return new LiteralResultIterationPlan(this.tuples, this.context, 
this.statement, this.tableRef, 
 +                this.projection, limit, this.orderBy, 
this.parallelIteratorFactory);
 +    }
 +
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 1e18aa6,9f7e482..2d408bc
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@@ -77,20 -76,15 +77,19 @@@ public class ScanPlan extends BaseQuery
      private List<KeyRange> splits;
      private List<List<Scan>> scans;
      private boolean allowPageFilter;
 -    
 +
 +    public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws 
SQLException {
-         return new ScanPlan(plan.getContext(), plan.getStatement(), 
plan.getTableRef(), plan.getProjector(), null, newOrderBy, 
plan.parallelIteratorFactory, plan.allowPageFilter);
++        return new ScanPlan(plan.getContext(), plan.getStatement(), 
plan.getTableRef(), plan.getProjector(), null, newOrderBy, 
plan.parallelIteratorFactory, plan.allowPageFilter, plan.dynamicFilter);
 +    }
-     
++        
      public ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) 
throws SQLException {
-         this(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
-                 parallelIteratorFactory != null ? parallelIteratorFactory : 
-                     buildResultIteratorFactory(context, table, orderBy, 
limit, allowPageFilter), 
-                 allowPageFilter);
+         this(context, statement, table, projector, limit, orderBy, 
parallelIteratorFactory, allowPageFilter, null);
      }
      
-     private ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, ParameterMetaData paramMetaData, 
Integer limit, OrderBy orderBy, ParallelIteratorFactory 
parallelIteratorFactory, boolean allowPageFilter) {
-         super(context, statement, table, projector, paramMetaData, limit, 
orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory);
+     private ScanPlan(StatementContext context, FilterableStatement statement, 
TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, 
ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, 
Expression dynamicFilter) throws SQLException {
+         super(context, statement, table, projector, 
context.getBindManager().getParameterMetaData(), limit, orderBy, 
GroupBy.EMPTY_GROUP_BY,
+                 parallelIteratorFactory != null ? parallelIteratorFactory :
+                         buildResultIteratorFactory(context, table, orderBy, 
limit, allowPageFilter), dynamicFilter);
          this.allowPageFilter = allowPageFilter;
          if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
              int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
@@@ -229,13 -223,4 +228,17 @@@
          return ScanUtil.isRoundRobinPossible(orderBy, context);
      }
  
 +    @Override
 +    public QueryPlan limit(Integer limit) {
 +        if (limit == this.limit || (limit != null && 
limit.equals(this.limit)))
 +            return this;
 +        
-         return new ScanPlan(this.context, this.statement, this.tableRef, 
this.projection, 
-                 this.paramMetaData, limit, this.orderBy, 
this.parallelIteratorFactory, this.allowPageFilter);
++        try {
++            return new ScanPlan(this.context, this.statement, this.tableRef, 
this.projection, 
++                    limit, this.orderBy, this.parallelIteratorFactory, 
this.allowPageFilter, this.dynamicFilter);
++        } catch (SQLException e) {
++            throw new RuntimeException(e);
++        }
 +    }
 +
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8c19e1c1/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------

Reply via email to