Fix sync errors

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c8c33a91
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c8c33a91
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c8c33a91

Branch: refs/heads/calcite
Commit: c8c33a918e4ca66cbcc2ffa3e15107f53a60f58b
Parents: c3613fd 13ae3f3
Author: maryannxue <maryann....@gmail.com>
Authored: Fri Oct 28 14:26:27 2016 -0700
Committer: maryannxue <maryann....@gmail.com>
Committed: Fri Oct 28 14:26:27 2016 -0700

----------------------------------------------------------------------
 bin/queryserver.py                              |    2 +-
 .../src/build/components/all-common-jars.xml    |   10 +-
 phoenix-client/pom.xml                          |    6 +
 .../phoenix/end2end/AbsFunctionEnd2EndIT.java   |    6 +-
 .../phoenix/end2end/AggregateQueryIT.java       |    2 +-
 .../phoenix/end2end/ArithmeticQueryIT.java      |   11 +-
 .../phoenix/end2end/ArrayFillFunctionIT.java    |    6 +-
 .../phoenix/end2end/DefaultColumnValueIT.java   | 1070 ++++++++++++++++++
 .../phoenix/end2end/ExecuteStatementsIT.java    |   12 +-
 .../phoenix/end2end/FirstValueFunctionIT.java   |   60 +-
 .../apache/phoenix/end2end/GroupByCaseIT.java   |   34 +-
 .../org/apache/phoenix/end2end/HashJoinIT.java  |   32 +-
 .../apache/phoenix/end2end/IndexExtendedIT.java |    7 +-
 .../phoenix/end2end/LastValueFunctionIT.java    |  148 +--
 .../phoenix/end2end/NthValueFunctionIT.java     |   48 +-
 .../phoenix/end2end/OnDuplicateKeyIT.java       |  523 +++++++++
 .../phoenix/end2end/ProductMetricsIT.java       |   68 +-
 .../org/apache/phoenix/end2end/QueryMoreIT.java |  107 +-
 .../phoenix/end2end/QueryWithOffsetIT.java      |    8 +
 .../phoenix/end2end/RoundFloorCeilFuncIT.java   |   17 +-
 .../phoenix/end2end/SignFunctionEnd2EndIT.java  |    6 +-
 .../apache/phoenix/end2end/SortMergeJoinIT.java |   40 +-
 .../org/apache/phoenix/end2end/SortOrderIT.java |    6 +-
 .../phoenix/end2end/StatsCollectorIT.java       |   53 +-
 .../end2end/SubqueryUsingSortMergeJoinIT.java   |    2 +-
 .../org/apache/phoenix/end2end/UnionAllIT.java  |    6 +-
 .../apache/phoenix/end2end/UpsertSelectIT.java  |   58 +-
 .../apache/phoenix/end2end/UpsertValuesIT.java  |   10 +-
 .../org/apache/phoenix/end2end/UseSchemaIT.java |   39 +
 .../phoenix/end2end/VariableLengthPKIT.java     |   46 +-
 .../index/ChildViewsUseParentViewIndexIT.java   |   17 +
 .../phoenix/end2end/index/IndexTestUtil.java    |    6 +-
 .../phoenix/end2end/index/LocalIndexIT.java     |   21 +
 .../end2end/index/MutableIndexFailureIT.java    |   13 +-
 .../phoenix/end2end/index/MutableIndexIT.java   |    5 +-
 .../phoenix/end2end/index/ViewIndexIT.java      |  260 ++++-
 .../phoenix/iterate/MockResultIterator.java     |    2 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |   15 +
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |   32 +-
 .../apache/phoenix/calcite/TableMapping.java    |    8 +-
 .../calcite/rel/PhoenixRelImplementor.java      |    3 +-
 .../calcite/rel/PhoenixRelImplementorImpl.java  |    3 +-
 .../phoenix/calcite/rel/PhoenixTableModify.java |   15 +-
 .../phoenix/compile/CreateTableCompiler.java    |   28 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |   14 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |   10 +-
 .../apache/phoenix/compile/SequenceManager.java |    3 +
 .../apache/phoenix/compile/UpsertCompiler.java  |  165 ++-
 .../apache/phoenix/compile/WhereOptimizer.java  |   27 +-
 .../coprocessor/BaseScannerRegionObserver.java  |    1 +
 .../coprocessor/MetaDataEndpointImpl.java       |   17 +-
 .../phoenix/coprocessor/MetaDataProtocol.java   |    8 +-
 .../coprocessor/MetaDataRegionObserver.java     |  101 +-
 .../coprocessor/SequenceRegionObserver.java     |    2 +-
 .../UngroupedAggregateRegionObserver.java       |  514 +++++----
 .../exception/DataExceedsCapacityException.java |   14 +-
 .../phoenix/exception/SQLExceptionCode.java     |    9 +-
 .../phoenix/exception/SQLExceptionInfo.java     |    9 +-
 .../apache/phoenix/execute/MutationState.java   |   32 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |    6 +-
 .../apache/phoenix/execute/TupleProjector.java  |    3 +-
 .../org/apache/phoenix/execute/UnionPlan.java   |    3 +-
 .../phoenix/expression/ExpressionType.java      |    4 +-
 .../function/ArrayConcatFunction.java           |    5 +-
 .../function/ArrayModifierFunction.java         |    8 +-
 .../function/DefaultValueExpression.java        |   91 ++
 .../phoenix/filter/ColumnProjectionFilter.java  |   29 +-
 .../org/apache/phoenix/hbase/index/Indexer.java |   92 +-
 .../hbase/index/builder/BaseIndexBuilder.java   |   14 +-
 .../hbase/index/builder/IndexBuildManager.java  |   10 +
 .../hbase/index/builder/IndexBuilder.java       |   29 +-
 .../phoenix/hbase/index/covered/IndexCodec.java |    1 -
 .../hbase/index/util/IndexManagementUtil.java   |   13 +-
 .../hbase/index/util/KeyValueBuilder.java       |   15 +-
 .../apache/phoenix/index/IndexMaintainer.java   |    2 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |  319 ++++++
 .../phoenix/index/PhoenixIndexMetaData.java     |    7 +-
 .../phoenix/iterate/BaseResultIterators.java    |   89 +-
 .../iterate/MergeSortTopNResultIterator.java    |    3 +
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |    6 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   11 +-
 .../org/apache/phoenix/parse/ColumnDef.java     |   79 ++
 .../phoenix/parse/CreateSchemaStatement.java    |    4 +-
 .../phoenix/parse/CreateTableStatement.java     |   13 +
 .../apache/phoenix/parse/FunctionParseNode.java |   73 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |   21 +-
 .../apache/phoenix/parse/UpsertStatement.java   |   10 +-
 .../phoenix/parse/UseSchemaStatement.java       |    4 +-
 .../phoenix/query/ConnectionQueryServices.java  |   20 +-
 .../query/ConnectionQueryServicesImpl.java      |  122 +-
 .../query/ConnectionlessQueryServicesImpl.java  |   28 +-
 .../query/DelegateConnectionQueryServices.java  |   12 +-
 .../apache/phoenix/query/GuidePostsCache.java   |  231 ++++
 .../apache/phoenix/query/QueryConstants.java    |    3 +-
 .../apache/phoenix/query/TableStatsCache.java   |  192 ----
 .../org/apache/phoenix/schema/ColumnRef.java    |   42 +-
 .../apache/phoenix/schema/DelegateColumn.java   |   10 +
 .../phoenix/schema/DelegateSQLException.java    |   62 +
 .../apache/phoenix/schema/DelegateTable.java    |   18 +-
 .../apache/phoenix/schema/MetaDataClient.java   |  137 +--
 .../org/apache/phoenix/schema/PColumnImpl.java  |   12 +-
 .../apache/phoenix/schema/PMetaDataImpl.java    |    2 +-
 .../java/org/apache/phoenix/schema/PRow.java    |   11 +-
 .../java/org/apache/phoenix/schema/PTable.java  |    6 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |  144 ++-
 .../apache/phoenix/schema/TableProperty.java    |    7 +
 .../stats/DefaultStatisticsCollector.java       |  145 ++-
 .../phoenix/schema/stats/GuidePostsInfo.java    |   77 +-
 .../schema/stats/GuidePostsInfoBuilder.java     |    4 +
 .../phoenix/schema/stats/GuidePostsKey.java     |   84 ++
 .../schema/stats/NoOpStatisticsCollector.java   |    9 +-
 .../phoenix/schema/stats/PTableStats.java       |   57 -
 .../phoenix/schema/stats/PTableStatsImpl.java   |  115 --
 .../schema/stats/StatisticsCollector.java       |   12 +-
 .../stats/StatisticsCollectorFactory.java       |   19 +-
 .../phoenix/schema/stats/StatisticsScanner.java |    3 +-
 .../phoenix/schema/stats/StatisticsUtil.java    |  135 +--
 .../phoenix/schema/stats/StatisticsWriter.java  |    6 +-
 .../phoenix/schema/types/PArrayDataType.java    |   11 +-
 .../apache/phoenix/schema/types/PBinary.java    |  334 +++---
 .../phoenix/schema/types/PBinaryBase.java       |   17 +
 .../org/apache/phoenix/schema/types/PChar.java  |   15 +-
 .../apache/phoenix/schema/types/PDataType.java  |    5 +-
 .../apache/phoenix/schema/types/PDecimal.java   |  669 +++++------
 .../apache/phoenix/schema/types/PVarbinary.java |  248 ++--
 .../apache/phoenix/schema/types/PVarchar.java   |  268 ++---
 .../org/apache/phoenix/util/ExpressionUtil.java |    1 -
 .../java/org/apache/phoenix/util/IndexUtil.java |    2 +-
 .../org/apache/phoenix/util/MetaDataUtil.java   |   44 +-
 .../org/apache/phoenix/util/PhoenixRuntime.java |   34 +
 .../java/org/apache/phoenix/util/ScanUtil.java  |    4 +
 .../org/apache/phoenix/util/SchemaUtil.java     |   11 +-
 .../org/apache/phoenix/util/UpgradeUtil.java    |    2 +-
 .../phoenix/compile/JoinQueryCompilerTest.java  |    2 +-
 .../phoenix/compile/QueryCompilerTest.java      |  352 ++++++
 .../phoenix/compile/QueryOptimizerTest.java     |   15 +-
 .../compile/StatementHintsCompilationTest.java  |    2 +-
 .../phoenix/compile/WhereCompilerTest.java      |    2 +-
 .../phoenix/compile/WhereOptimizerTest.java     |  114 ++
 .../phoenix/execute/CorrelatePlanTest.java      |    6 +-
 .../phoenix/filter/SkipScanBigFilterTest.java   |   25 +-
 .../jdbc/PhoenixResultSetMetadataTest.java      |    4 +-
 .../java/org/apache/phoenix/query/BaseTest.java |    2 +-
 .../PhoenixStatsCacheRemovalListenerTest.java   |    2 +-
 .../org/apache/phoenix/query/QueryPlanTest.java |   23 +-
 .../org/apache/phoenix/schema/MutationTest.java |   54 +
 .../apache/phoenix/util/PhoenixRuntimeTest.java |   44 +
 .../java/org/apache/phoenix/util/TestUtil.java  |   35 +-
 .../java/org/apache/phoenix/pherf/Pherf.java    |    6 +
 .../phoenix/pherf/workload/WriteWorkload.java   |   49 +-
 phoenix-pig/pom.xml                             |  263 +++++
 .../phoenix/end2end/QueryServerBasicsIT.java    |    9 +-
 .../phoenix/end2end/QueryServerThread.java      |   10 +-
 .../apache/phoenix/queryserver/server/Main.java |  333 ------
 .../phoenix/queryserver/server/QueryServer.java |  340 ++++++
 .../server/PhoenixDoAsCallbackTest.java         |    2 +-
 pom.xml                                         |    2 +-
 157 files changed, 7088 insertions(+), 2759 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
index a00facf,0000000..216920f
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
@@@ -1,366 -1,0 +1,370 @@@
 +package org.apache.phoenix.calcite;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.sql.SQLException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Set;
 +
 +import org.apache.calcite.plan.RelOptUtil.InputFinder;
 +import org.apache.calcite.rex.RexNode;
 +import org.apache.calcite.util.ImmutableBitSet;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.hbase.util.Pair;
 +import org.apache.hadoop.io.WritableUtils;
 +import org.apache.phoenix.compile.ColumnProjector;
 +import org.apache.phoenix.compile.ExpressionProjector;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.TupleProjectionCompiler;
 +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 +import org.apache.phoenix.execute.TupleProjector;
 +import org.apache.phoenix.expression.ColumnExpression;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.expression.LiteralExpression;
 +import org.apache.phoenix.index.IndexMaintainer;
 +import org.apache.phoenix.jdbc.PhoenixConnection;
 +import org.apache.phoenix.schema.ColumnRef;
 +import org.apache.phoenix.schema.PColumn;
 +import org.apache.phoenix.schema.PColumnFamily;
 +import org.apache.phoenix.schema.PName;
 +import org.apache.phoenix.schema.PNameFactory;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.PTableImpl;
 +import org.apache.phoenix.schema.PTableType;
 +import org.apache.phoenix.schema.ProjectedColumn;
 +import org.apache.phoenix.schema.TableRef;
 +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 +import org.apache.phoenix.schema.PTable.IndexType;
 +import org.apache.phoenix.util.ByteUtil;
 +import org.apache.phoenix.util.IndexUtil;
 +import org.apache.phoenix.util.SchemaUtil;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Sets;
 +
 +public class TableMapping {
 +    private final TableRef tableRef;
 +    private final TableRef dataTableRef;
 +    private final List<PColumn> mappedColumns;
 +    private final int extendedColumnsOffset;
 +    private final TableRef extendedTableRef;
 +
 +    public TableMapping(PTable table) {
 +        this.tableRef = new TableRef(table);
 +        this.dataTableRef = null;
 +        this.mappedColumns = getMappedColumns(table);
 +        this.extendedColumnsOffset = mappedColumns.size();
 +        this.extendedTableRef = null;
 +    }
 +
 +    public TableMapping(TableRef tableRef, TableRef dataTableRef, boolean 
extend) throws SQLException {
 +        this.tableRef = tableRef;
 +        this.dataTableRef = dataTableRef;
 +        if (!extend) {
 +            this.mappedColumns = getMappedColumns(tableRef.getTable());
 +            this.extendedColumnsOffset = mappedColumns.size();
 +            this.extendedTableRef = null;            
 +        } else {
 +            this.mappedColumns = Lists.newArrayList();
 +            this.mappedColumns.addAll(getMappedColumns(tableRef.getTable()));
 +            this.extendedColumnsOffset = mappedColumns.size();
 +            final PTable dataTable = dataTableRef.getTable();
 +            final List<PColumn> projectedColumns = 
getDataTableMappedColumns(dataTableRef, mappedColumns);
 +            this.mappedColumns.addAll(projectedColumns);
 +            PTable extendedTable = 
PTableImpl.makePTable(dataTable.getTenantId(),
 +                    TupleProjectionCompiler.PROJECTED_TABLE_SCHEMA, 
dataTable.getName(),
 +                    PTableType.PROJECTED, null, dataTable.getTimeStamp(),
 +                    dataTable.getSequenceNumber(), dataTable.getPKName(), 
null,
 +                    projectedColumns, null, null, 
Collections.<PTable>emptyList(),
 +                    dataTable.isImmutableRows(), 
Collections.<PName>emptyList(), null, null,
 +                    dataTable.isWALDisabled(), false, 
dataTable.getStoreNulls(),
 +                    dataTable.getViewType(), null, null, 
dataTable.rowKeyOrderOptimizable(),
 +                    dataTable.isTransactional(), 
dataTable.getUpdateCacheFrequency(),
 +                    dataTable.getIndexDisableTimestamp(), 
dataTable.isNamespaceMapped(),
 +                    dataTable.getAutoPartitionSeqName(), 
dataTable.isAppendOnlySchema());
 +            this.extendedTableRef = new TableRef(extendedTable);
 +        }
 +    }
 +
 +    public TableRef getTableRef() {
 +        return tableRef;
 +    }
 +    
 +    public PTable getPTable() {
 +        return tableRef.getTable();
 +    }
 +    
 +    public TableRef getDataTableRef() {
 +        return dataTableRef;
 +    }
 +    
 +    public List<PColumn> getMappedColumns() {
 +        return mappedColumns;
 +    }
 +    
 +    public boolean hasExtendedColumns() {
 +        return extendedTableRef != null;
 +    }
 +    
-     public ColumnExpression newColumnExpression(int index) {
++    public Expression newColumnExpression(int index) {
 +        ColumnRef colRef = new ColumnRef(
 +                index < extendedColumnsOffset ? tableRef : extendedTableRef,
 +                this.mappedColumns.get(index).getPosition());
-         return colRef.newColumnExpression();
++        try {
++            return colRef.newColumnExpression();
++        } catch (SQLException e) {
++            throw new RuntimeException(e);
++        }
 +    }
 +    
 +    public ImmutableBitSet getDefaultExtendedColumnRef() {
 +        return ImmutableBitSet.range(extendedColumnsOffset, 
mappedColumns.size());
 +    }
 +    
 +    public ImmutableBitSet getExtendedColumnRef(List<RexNode> exprs) {
 +        if (!hasExtendedColumns()) {
 +            return ImmutableBitSet.of();
 +        }
 +        
 +        ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
 +        for (RexNode expr : exprs) {
 +            builder.addAll(InputFinder.analyze(expr).inputBitSet.build());
 +        }
 +        for (int i = 0; i < extendedColumnsOffset; i++) {
 +            builder.clear(i);
 +        }
 +        return builder.build();
 +    }
 +   
 +    public Pair<Integer, Integer> 
getExtendedColumnReferenceCount(ImmutableBitSet columnRef) {
 +        Set<String> cf = Sets.newHashSet();
 +        int columnCount = 0;
 +        for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) {
 +            if (columnRef.get(i)) {
 +                PColumn dataColumn = ((ProjectedColumn) mappedColumns.get(i))
 +                        .getSourceColumnRef().getColumn();
 +                cf.add(dataColumn.getFamilyName().getString());
 +                columnCount++;
 +            }
 +        }
 +        return new Pair<Integer, Integer>(cf.size(), columnCount);
 +    }
 +    
 +    public PTable createProjectedTable(boolean retainPKColumns) {
 +        List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
 +        List<PColumn> columns = retainPKColumns ?
 +                  tableRef.getTable().getColumns() : mappedColumns.subList(0, 
extendedColumnsOffset);
 +        for (PColumn column : columns) {
 +            sourceColumnRefs.add(new ColumnRef(tableRef, 
column.getPosition()));
 +        }
 +        if (extendedColumnsOffset < mappedColumns.size()) {
 +            for (PColumn column : 
mappedColumns.subList(extendedColumnsOffset, mappedColumns.size())) {
 +                sourceColumnRefs.add(new ColumnRef(extendedTableRef, 
column.getPosition()));
 +            }
 +        }
 +        
 +        try {
 +            return TupleProjectionCompiler.createProjectedTable(tableRef, 
sourceColumnRefs, retainPKColumns);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
 +    public TupleProjector createTupleProjector(boolean retainPKColumns) {
 +        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
 +        List<Expression> exprs = Lists.<Expression> newArrayList();
 +        for (int i = 0; i < mappedColumns.size(); i++) {
 +            if (!SchemaUtil.isPKColumn(mappedColumns.get(i)) || 
!retainPKColumns) {
 +                Expression expr = newColumnExpression(i);
 +                exprs.add(expr);
 +                builder.addField(expr);
 +            }
 +        }
 +        
 +        return new TupleProjector(builder.build(), exprs.toArray(new 
Expression[exprs.size()]));
 +    }
 +    
 +    public RowProjector createRowProjector() {
 +        List<ColumnProjector> columnProjectors = 
Lists.<ColumnProjector>newArrayList();
 +        for (int i = 0; i < mappedColumns.size(); i++) {
 +            PColumn column = mappedColumns.get(i);
 +            Expression expr = newColumnExpression(i); // Do not use 
column.position() here.
 +            columnProjectors.add(new 
ExpressionProjector(column.getName().getString(), 
tableRef.getTable().getName().getString(), expr, false));
 +        }
 +        // TODO get estimate row size
 +        return new RowProjector(columnProjectors, 0, false);        
 +    }
 +    
 +    public void setupScanForExtendedTable(Scan scan, ImmutableBitSet 
extendedColumnRef,
 +            PhoenixConnection connection) throws SQLException {
 +        if (extendedTableRef == null || extendedColumnRef.isEmpty()) {
 +            return;
 +        }
 +        
 +        TableRef dataTableRef = null;
 +        List<PColumn> dataColumns = Lists.newArrayList();
 +        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
 +        List<Expression> exprs = Lists.<Expression> newArrayList();
 +        for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) {
 +            ProjectedColumn column = (ProjectedColumn) mappedColumns.get(i);
 +            builder.addField(column);
 +            if (extendedColumnRef.get(i)) {
 +                dataColumns.add(column.getSourceColumnRef().getColumn());
 +                exprs.add(column.getSourceColumnRef().newColumnExpression());
 +                if (dataTableRef == null) {
 +                    dataTableRef = column.getSourceColumnRef().getTableRef();
 +                }
 +            } else {
 +                exprs.add(LiteralExpression.newConstant(null));
 +            }
 +        }
 +        if (dataColumns.isEmpty()) {
 +            return;
 +        }
 +        
 +        // Set data columns to be join back from data table.
 +        serializeDataTableColumnsToJoin(scan, dataColumns);
 +        // Set tuple projector of the data columns.
 +        TupleProjector projector = new TupleProjector(builder.build(), 
exprs.toArray(new Expression[exprs.size()]));
 +        TupleProjector.serializeProjectorIntoScan(scan, projector, 
IndexUtil.INDEX_PROJECTOR);
 +        PTable dataTable = dataTableRef.getTable();
 +        // Set index maintainer of the local index.
 +        serializeIndexMaintainerIntoScan(scan, dataTable, connection);
 +        // Set view constants if exists.
 +        serializeViewConstantsIntoScan(scan, dataTable);
 +    }
 +
 +    private static void serializeDataTableColumnsToJoin(Scan scan, 
List<PColumn> dataColumns) {
 +        ByteArrayOutputStream stream = new ByteArrayOutputStream();
 +        try {
 +            DataOutputStream output = new DataOutputStream(stream);
 +            WritableUtils.writeVInt(output, dataColumns.size());
 +            for (PColumn column : dataColumns) {
 +                Bytes.writeByteArray(output, 
column.getFamilyName().getBytes());
 +                Bytes.writeByteArray(output, column.getName().getBytes());
 +            }
 +            
scan.setAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN, 
stream.toByteArray());
 +        } catch (IOException e) {
 +            throw new RuntimeException(e);
 +        } finally {
 +            try {
 +                stream.close();
 +            } catch (IOException e) {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +    }
 +
 +    private void serializeIndexMaintainerIntoScan(Scan scan, PTable 
dataTable, PhoenixConnection connection) throws SQLException {
 +        PName name = getPTable().getName();
 +        List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
 +        for (PTable index : dataTable.getIndexes()) {
 +            if (index.getName().equals(name) && index.getIndexType() == 
IndexType.LOCAL) {
 +                indexes.add(index);
 +                break;
 +            }
 +        }
 +        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 +        IndexMaintainer.serialize(dataTable, ptr, indexes, connection);
 +        scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
 +        if (dataTable.isTransactional()) {
 +            scan.setAttribute(BaseScannerRegionObserver.TX_STATE, 
connection.getMutationState().encodeTransaction());
 +        }
 +    }
 +
 +    private static void serializeViewConstantsIntoScan(Scan scan, PTable 
dataTable) {
 +        int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + 
(dataTable.isMultiTenant() ? 1 : 0);
 +        int nViewConstants = 0;
 +        if (dataTable.getType() == PTableType.VIEW) {
 +            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 +            List<PColumn> dataPkColumns = dataTable.getPKColumns();
 +            for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
 +                PColumn dataPKColumn = dataPkColumns.get(i);
 +                if (dataPKColumn.getViewConstant() != null) {
 +                    nViewConstants++;
 +                }
 +            }
 +            if (nViewConstants > 0) {
 +                byte[][] viewConstants = new byte[nViewConstants][];
 +                int j = 0;
 +                for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
 +                    PColumn dataPkColumn = dataPkColumns.get(i);
 +                    if (dataPkColumn.getViewConstant() != null) {
 +                        if (IndexUtil.getViewConstantValue(dataPkColumn, 
ptr)) {
 +                            viewConstants[j++] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
 +                        } else {
 +                            throw new IllegalStateException();
 +                        }
 +                    }
 +                }
 +                serializeViewConstantsIntoScan(viewConstants, scan);
 +            }
 +        }
 +    }
 +
 +    private static void serializeViewConstantsIntoScan(byte[][] 
viewConstants, Scan scan) {
 +        ByteArrayOutputStream stream = new ByteArrayOutputStream();
 +        try {
 +            DataOutputStream output = new DataOutputStream(stream);
 +            WritableUtils.writeVInt(output, viewConstants.length);
 +            for (byte[] viewConstant : viewConstants) {
 +                Bytes.writeByteArray(output, viewConstant);
 +            }
 +            scan.setAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS, 
stream.toByteArray());
 +        } catch (IOException e) {
 +            throw new RuntimeException(e);
 +        } finally {
 +            try {
 +                stream.close();
 +            } catch (IOException e) {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +    }
 +    
 +    private static List<PColumn> getMappedColumns(PTable pTable) {
 +        int initPosition =
 +                  (pTable.getBucketNum() ==null ? 0 : 1)
 +                + (pTable.isMultiTenant() ? 1 : 0)
 +                + (pTable.getViewIndexId() == null ? 0 : 1);
 +        List<PColumn> columns = 
Lists.newArrayListWithExpectedSize(pTable.getColumns().size() - initPosition);
 +        for (int i = initPosition; i < pTable.getPKColumns().size(); i++) {
 +            columns.add(pTable.getPKColumns().get(i));
 +        }
 +        for (PColumnFamily family : pTable.getColumnFamilies()) {
 +            for (PColumn column : family.getColumns()) {
 +                columns.add(column);
 +            }
 +        }
 +        
 +        return columns;
 +    }
 +    
 +    private static List<PColumn> getDataTableMappedColumns(TableRef 
dataTableRef, List<PColumn> mappedColumns) {
 +        Set<String> names = Sets.newHashSet();
 +        for (PColumn column : mappedColumns) {
 +            names.add(column.getName().getString());
 +        }
 +        List<PColumn> projectedColumns = new ArrayList<PColumn>();
 +        for (PColumnFamily cf : dataTableRef.getTable().getColumnFamilies()) {
 +            for (PColumn sourceColumn : cf.getColumns()) {
 +                String colName = IndexUtil.getIndexColumnName(sourceColumn);
 +                if (!names.contains(colName)) {
 +                    ColumnRef sourceColumnRef =
 +                            new ColumnRef(dataTableRef, 
sourceColumn.getPosition());
 +                    PColumn column = new 
ProjectedColumn(PNameFactory.newName(colName),
 +                            cf.getName(), projectedColumns.size(),
 +                            sourceColumn.isNullable(), sourceColumnRef);
 +                    projectedColumns.add(column);
 +                }
 +            }            
 +        }
 +        
 +        return projectedColumns;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
index 2ddc3ea,0000000..cdcd74f
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
@@@ -1,52 -1,0 +1,51 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.util.List;
 +
 +import org.apache.calcite.util.ImmutableIntList;
 +import org.apache.phoenix.calcite.PhoenixSequence;
 +import org.apache.phoenix.calcite.TableMapping;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.SequenceValueExpression;
 +import org.apache.phoenix.execute.RuntimeContext;
 +import org.apache.phoenix.execute.TupleProjector;
- import org.apache.phoenix.expression.ColumnExpression;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.parse.SequenceValueParseNode;
 +import org.apache.phoenix.schema.types.PDataType;
 +
 +/** Holds context for an traversal over a tree of relational expressions
 + * to convert it to an executable plan. */
 +public interface PhoenixRelImplementor {
 +    QueryPlan visitInput(int i, PhoenixQueryRel input);
-     ColumnExpression newColumnExpression(int index);
++    Expression newColumnExpression(int index);
 +    @SuppressWarnings("rawtypes")
 +    Expression newBindParameterExpression(int index, PDataType type, Integer 
maxLength);
 +    @SuppressWarnings("rawtypes")
 +    Expression newFieldAccessExpression(String variableId, int index, 
PDataType type);
 +    SequenceValueExpression newSequenceExpression(PhoenixSequence seq, 
SequenceValueParseNode.Op op);
 +    RuntimeContext getRuntimeContext();
 +    void setTableMapping(TableMapping tableMapping);
 +    TableMapping getTableMapping();
 +    void setSequenceManager(SequenceManager sequenceManager);
 +    void pushContext(ImplementorContext context);
 +    ImplementorContext popContext();
 +    ImplementorContext getCurrentContext();
 +    TupleProjector project(List<Expression> exprs);
 +    
 +    class ImplementorContext {
 +        public final boolean retainPKColumns;
 +        public final boolean forceProject;
 +        public final ImmutableIntList columnRefList;
 +        
 +        public ImplementorContext(boolean retainPKColumns, boolean 
forceProject, ImmutableIntList columnRefList) {
 +            this.retainPKColumns = retainPKColumns;
 +            this.forceProject = forceProject;
 +            this.columnRefList = columnRefList;
 +        }
 +        
 +        public ImplementorContext withColumnRefList(ImmutableIntList 
columnRefList) {
 +            return new ImplementorContext(this.retainPKColumns, 
this.forceProject, columnRefList);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
index fa4649b,0000000..d581ec0
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
@@@ -1,140 -1,0 +1,139 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.sql.SQLException;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Stack;
 +
 +import org.apache.phoenix.calcite.PhoenixSequence;
 +import org.apache.phoenix.calcite.TableMapping;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.SequenceValueExpression;
 +import org.apache.phoenix.coprocessor.MetaDataProtocol;
 +import org.apache.phoenix.execute.RuntimeContext;
 +import org.apache.phoenix.execute.TupleProjector;
 +import org.apache.phoenix.expression.BindParameterExpression;
- import org.apache.phoenix.expression.ColumnExpression;
 +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
 +import org.apache.phoenix.expression.Expression;
 +import org.apache.phoenix.parse.ParseNodeFactory;
 +import org.apache.phoenix.parse.SequenceValueParseNode;
 +import org.apache.phoenix.parse.TableName;
 +import org.apache.phoenix.schema.KeyValueSchema;
 +import org.apache.phoenix.schema.PColumn;
 +import org.apache.phoenix.schema.PColumnImpl;
 +import org.apache.phoenix.schema.PName;
 +import org.apache.phoenix.schema.PNameFactory;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.PTableImpl;
 +import org.apache.phoenix.schema.PTableType;
 +import org.apache.phoenix.schema.types.PDataType;
 +import com.google.common.collect.Lists;
 +
 +public class PhoenixRelImplementorImpl implements PhoenixRelImplementor {
 +    private final RuntimeContext runtimeContext;
 +      private Stack<ImplementorContext> contextStack;
 +      private SequenceManager sequenceManager;
 +      private TableMapping tableMapping;
 +      
 +      public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) {
 +          this.runtimeContext = runtimeContext;
 +          this.contextStack = new Stack<ImplementorContext>();
 +      }
 +      
 +    @Override
 +    public QueryPlan visitInput(int i, PhoenixQueryRel input) {
 +        return input.implement(this);
 +    }
 +
 +      @Override
-       public ColumnExpression newColumnExpression(int index) {
++      public Expression newColumnExpression(int index) {
 +              return tableMapping.newColumnExpression(index);
 +      }
 +    
 +    @SuppressWarnings("rawtypes")
 +    @Override
 +    public Expression newBindParameterExpression(int index, PDataType type, 
Integer maxLength) {
 +        return new BindParameterExpression(index, type, maxLength, 
runtimeContext);
 +    }
 +    
 +    @SuppressWarnings("rawtypes")
 +    @Override
 +    public Expression newFieldAccessExpression(String variableId, int index, 
PDataType type) {
 +        Expression fieldAccessExpr = 
runtimeContext.getCorrelateVariable(variableId).newExpression(index);
 +        return new CorrelateVariableFieldAccessExpression(runtimeContext, 
variableId, fieldAccessExpr);
 +    }
 +    
 +    @Override
 +    public SequenceValueExpression newSequenceExpression(PhoenixSequence seq, 
SequenceValueParseNode.Op op) {
 +        PName tenantName = seq.pc.getTenantId();
 +        TableName tableName = TableName.create(seq.schemaName, 
seq.sequenceName);
 +        try {
 +            return sequenceManager.newSequenceReference(tenantName, 
tableName, null, op);
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
 +    @Override
 +    public RuntimeContext getRuntimeContext() {
 +        return runtimeContext;
 +    }
 +
 +    @Override
 +      public void setTableMapping(TableMapping tableMapping) {
 +              this.tableMapping = tableMapping;
 +      }
 +    
 +    @Override
 +    public TableMapping getTableMapping() {
 +        return this.tableMapping;
 +    }
 +    
 +    @Override
 +    public void setSequenceManager(SequenceManager sequenceManager) {
 +        this.sequenceManager = sequenceManager;
 +    }
 +
 +    @Override
 +    public void pushContext(ImplementorContext context) {
 +        this.contextStack.push(context);
 +    }
 +
 +    @Override
 +    public ImplementorContext popContext() {
 +        return contextStack.pop();
 +    }
 +
 +    @Override
 +    public ImplementorContext getCurrentContext() {
 +        return contextStack.peek();
 +    }
 +    
 +    @Override
 +    public TupleProjector project(List<Expression> exprs) {
 +        KeyValueSchema.KeyValueSchemaBuilder builder = new 
KeyValueSchema.KeyValueSchemaBuilder(0);
 +        List<PColumn> columns = Lists.<PColumn>newArrayList();
 +        for (int i = 0; i < exprs.size(); i++) {
 +            String name = ParseNodeFactory.createTempAlias();
 +            Expression expr = exprs.get(i);
 +            builder.addField(expr);
 +            columns.add(new PColumnImpl(PNameFactory.newName(name), 
PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY),
 +                    expr.getDataType(), expr.getMaxLength(), expr.getScale(), 
expr.isNullable(),
 +                    i, expr.getSortOrder(), null, null, false, name, false, 
false));
 +        }
 +        try {
 +            PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, 
PName.EMPTY_NAME,
 +                    PTableType.SUBQUERY, null, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
 +                    null, null, columns, null, null, 
Collections.<PTable>emptyList(),
 +                    false, Collections.<PName>emptyList(), null, null, false, 
false, false, null,
 +                    null, null, true, false, 0, 0, false, null, false);
 +            this.setTableMapping(new TableMapping(pTable));
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +        
 +        return new TupleProjector(builder.build(), exprs.toArray(new 
Expression[exprs.size()]));        
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index 0915d09,0000000..579914a
mode 100644,000000..100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@@ -1,395 -1,0 +1,396 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import static 
org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
 +
 +import java.sql.ParameterMetaData;
 +import java.sql.ResultSet;
 +import java.sql.SQLException;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptTable;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.prepare.Prepare.CatalogReader;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.TableModify;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 +import org.apache.phoenix.calcite.PhoenixTable;
 +import org.apache.phoenix.compile.ColumnResolver;
 +import org.apache.phoenix.compile.ExplainPlan;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.MutationPlan;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.compile.StatementPlan;
 +import org.apache.phoenix.exception.SQLExceptionCode;
 +import org.apache.phoenix.exception.SQLExceptionInfo;
 +import org.apache.phoenix.execute.MutationState;
 +import org.apache.phoenix.execute.MutationState.RowMutationState;
 +import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
 +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 +import org.apache.phoenix.iterate.ResultIterator;
 +import org.apache.phoenix.jdbc.PhoenixConnection;
 +import org.apache.phoenix.jdbc.PhoenixResultSet;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.query.ConnectionQueryServices;
 +import org.apache.phoenix.query.QueryServices;
 +import org.apache.phoenix.query.QueryServicesOptions;
 +import org.apache.phoenix.schema.IllegalDataException;
 +import org.apache.phoenix.schema.PColumn;
 +import org.apache.phoenix.schema.PName;
 +import org.apache.phoenix.schema.PRow;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.SortOrder;
 +import org.apache.phoenix.schema.TableRef;
 +import org.apache.phoenix.schema.types.PLong;
 +import org.apache.phoenix.util.ByteUtil;
 +import org.apache.phoenix.util.MetaDataUtil;
 +import org.apache.phoenix.util.ScanUtil;
 +import org.apache.phoenix.util.SchemaUtil;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Maps;
 +
 +public class PhoenixTableModify extends TableModify implements PhoenixRel {
 +
 +    public PhoenixTableModify(RelOptCluster cluster, RelTraitSet traits,
 +            RelOptTable table, CatalogReader catalogReader, RelNode child,
 +            Operation operation, List<String> updateColumnList, boolean 
flattened) {
 +        super(cluster, traits, table, catalogReader, child, operation, 
updateColumnList, flattened);
 +        assert operation == Operation.INSERT || operation == Operation.DELETE;
 +    }
 +
 +    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) 
{
 +      return new PhoenixTableModify(
 +          getCluster(),
 +          traitSet,
 +          getTable(),
 +          getCatalogReader(),
 +          sole(inputs),
 +          getOperation(),
 +          getUpdateColumnList(),
 +          isFlattened());
 +    }
 +
 +    @Override
 +    public StatementPlan implement(PhoenixRelImplementor implementor) {
 +        final QueryPlan queryPlan = implementor.visitInput(0, 
(PhoenixQueryRel) input);
 +        final RowProjector projector = 
implementor.getTableMapping().createRowProjector();
 +
 +        final PhoenixTable targetTable = 
getTable().unwrap(PhoenixTable.class);
 +        final PhoenixConnection connection = targetTable.pc;
 +        final TableRef targetTableRef = 
targetTable.tableMapping.getTableRef();
 +        
 +        if (getOperation() == Operation.INSERT) {
 +            return upsert(connection, targetTable, targetTableRef, queryPlan, 
projector);
 +        }
 +        
 +        // delete
 +        return delete(connection, targetTable, targetTableRef, queryPlan, 
projector);
 +    }
 +    
 +    private static MutationPlan upsert(final PhoenixConnection connection,
 +            final PhoenixTable targetTable, final TableRef targetTableRef,
 +            final QueryPlan queryPlan, final RowProjector projector) {
 +        try (PhoenixStatement stmt = new PhoenixStatement(connection)) {
 +            final ColumnResolver resolver = 
FromCompiler.getResolver(targetTableRef);
 +            final StatementContext context = new StatementContext(stmt, 
resolver, new Scan(), new SequenceManager(stmt));
 +
 +            // TODO TenantId, ViewIndexId, UpdatableViewColumns
 +            final List<PColumn> mappedColumns = 
targetTable.tableMapping.getMappedColumns();
 +            final int[] columnIndexes = new int[mappedColumns.size()];
 +            final int[] pkSlotIndexes = new int[mappedColumns.size()];
 +            for (int i = 0; i < columnIndexes.length; i++) {
 +                PColumn column = mappedColumns.get(i);
 +                if (SchemaUtil.isPKColumn(column)) {
 +                    pkSlotIndexes[i] = column.getPosition();
 +                }
 +                columnIndexes[i] = column.getPosition();
 +            }
 +            // TODO
 +            final boolean useServerTimestamp = false;
 +            
 +            return new MutationPlan() {
 +                @Override
 +                public ParameterMetaData getParameterMetaData() {
 +                    return 
queryPlan.getContext().getBindManager().getParameterMetaData();
 +                }
 +
 +                @Override
 +                public StatementContext getContext() {
 +                    return context;
 +                }
 +
 +                @Override
 +                public TableRef getTargetRef() {
 +                    return targetTableRef;
 +                }
 +
 +                @Override
 +                public Set<TableRef> getSourceRefs() {
 +                    // TODO return originalQueryPlan.getSourceRefs();
 +                    return queryPlan.getSourceRefs();
 +                }
 +
 +                @Override
 +                public org.apache.phoenix.jdbc.PhoenixStatement.Operation 
getOperation() {
 +                    return 
org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
 +                }
 +
 +                @Override
 +                public MutationState execute() throws SQLException {
 +                    ResultIterator iterator = queryPlan.iterator();
 +                    // simplest version, no run-on-server, no pipelined update
 +                    StatementContext childContext = queryPlan.getContext();
 +                    ConnectionQueryServices services = 
connection.getQueryServices();
 +                    int maxSize = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
 +                            QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
 +                    int batchSize = Math.min(connection.getMutateBatchSize(), 
maxSize);
 +                    boolean isAutoCommit = connection.getAutoCommit();
 +                    byte[][] values = new byte[columnIndexes.length][];
 +                    int rowCount = 0;
 +                    Map<ImmutableBytesPtr, RowMutationState> mutation = 
Maps.newHashMapWithExpectedSize(batchSize);
 +                    PTable table = targetTableRef.getTable();
 +                    try (ResultSet rs = new PhoenixResultSet(iterator, 
projector, childContext)) {
 +                        ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
 +                        while (rs.next()) {
 +                            for (int i = 0; i < values.length; i++) {
 +                                PColumn column = 
table.getColumns().get(columnIndexes[i]);
 +                                byte[] bytes = rs.getBytes(i + 1);
 +                                ptr.set(bytes == null ? 
ByteUtil.EMPTY_BYTE_ARRAY : bytes);
 +                                Object value = rs.getObject(i + 1);
 +                                int rsPrecision = 
rs.getMetaData().getPrecision(i + 1);
 +                                Integer precision = rsPrecision == 0 ? null : 
rsPrecision;
 +                                int rsScale = rs.getMetaData().getScale(i + 
1);
 +                                Integer scale = rsScale == 0 ? null : rsScale;
 +                                // We are guaranteed that the two column will 
have compatible types,
 +                                // as we checked that before.
-                                 if 
(!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), 
precision, scale,
-                                         column.getMaxLength(), 
column.getScale())) { throw new SQLExceptionInfo.Builder(
++                                if 
(!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), 
SortOrder.getDefault(), precision,
++                                        scale, column.getMaxLength(), 
column.getScale())) { throw new SQLExceptionInfo.Builder(
 +                                        
SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
 +                                        .setMessage("value=" + 
column.getDataType().toStringLiteral(ptr, null)).build()
 +                                        .buildException(); }
 +                                column.getDataType().coerceBytes(ptr, value, 
column.getDataType(), 
 +                                        precision, scale, 
SortOrder.getDefault(), 
 +                                        column.getMaxLength(), 
column.getScale(), column.getSortOrder(),
 +                                        table.rowKeyOrderOptimizable());
 +                                values[i] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
 +                            }
-                             setValues(values, pkSlotIndexes, columnIndexes, 
table, mutation, connection, useServerTimestamp);
++                            // TODO onDupKeyBytes
++                            setValues(values, pkSlotIndexes, columnIndexes, 
table, mutation, connection, useServerTimestamp, null);
 +                            rowCount++;
 +                            // Commit a batch if auto commit is true and 
we're at our batch size
 +                            if (isAutoCommit && rowCount % batchSize == 0) {
 +                                MutationState state = new 
MutationState(targetTableRef, mutation, 0, maxSize, connection);
 +                                connection.getMutationState().join(state);
 +                                connection.getMutationState().send();
 +                                mutation.clear();
 +                            }
 +                        }
 +                        // If auto commit is true, this last batch will be 
committed upon return
 +                        return new MutationState(targetTableRef, mutation, 
rowCount / batchSize * batchSize, maxSize, connection);
 +                    }
 +                }
 +
 +                @Override
 +                public ExplainPlan getExplainPlan() throws SQLException {
 +                    List<String> queryPlanSteps =  
queryPlan.getExplainPlan().getPlanSteps();
 +                    List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
 +                    planSteps.add("UPSERT SELECT");
 +                    planSteps.addAll(queryPlanSteps);
 +                    return new ExplainPlan(planSteps);
 +                }                
 +            };
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
-     private static void setValues(byte[][] values, int[] pkSlotIndex, int[] 
columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, 
PhoenixConnection connection, boolean useServerTimestamp) {
++    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] 
columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, 
PhoenixConnection connection, boolean useServerTimestamp, byte[] onDupKeyBytes) 
{
 +        Map<PColumn,byte[]> columnValues = 
Maps.newHashMapWithExpectedSize(columnIndexes.length);
 +        byte[][] pkValues = new byte[table.getPKColumns().size()][];
 +        // If the table uses salting, the first byte is the salting byte, set 
to an empty array
 +        // here and we will fill in the byte later in PRowImpl.
 +        if (table.getBucketNum() != null) {
 +            pkValues[0] = new byte[] {0};
 +        }
 +        Long rowTimestamp = null; // case when the table doesn't have a row 
timestamp column
 +        RowTimestampColInfo rowTsColInfo = new 
RowTimestampColInfo(useServerTimestamp, rowTimestamp);
 +        for (int i = 0; i < values.length; i++) {
 +            byte[] value = values[i];
 +            PColumn column = table.getColumns().get(columnIndexes[i]);
 +            if (SchemaUtil.isPKColumn(column)) {
 +                pkValues[pkSlotIndex[i]] = value;
 +                if (SchemaUtil.getPKPosition(table, column) == 
table.getRowTimestampColPos()) {
 +                    if (!useServerTimestamp) {
 +                        PColumn rowTimestampCol = 
table.getPKColumns().get(table.getRowTimestampColPos());
 +                        rowTimestamp = 
PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
 +                        if (rowTimestamp < 0) {
 +                            throw new IllegalDataException("Value of a column 
designated as ROW_TIMESTAMP cannot be less than zero");
 +                        }
 +                        rowTsColInfo = new 
RowTimestampColInfo(useServerTimestamp, rowTimestamp);
 +                    } 
 +                }
 +            } else {
 +                columnValues.put(column, value);
 +            }
 +        }
 +        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
 +        table.newKey(ptr, pkValues);
-         mutation.put(ptr, new RowMutationState(columnValues, 
connection.getStatementExecutionCounter(), rowTsColInfo));
++        mutation.put(ptr, new RowMutationState(columnValues, 
connection.getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
 +    }
 +
 +    private static MutationPlan delete(final PhoenixConnection connection,
 +            final PhoenixTable targetTable, final TableRef targetTableRef,
 +            final QueryPlan queryPlan, final RowProjector projector) {
 +        final StatementContext context = queryPlan.getContext();
 +        // TODO
 +        final boolean deleteFromImmutableIndexToo = false;
 +        return new MutationPlan() {
 +            @Override
 +            public ParameterMetaData getParameterMetaData() {
 +                return context.getBindManager().getParameterMetaData();
 +            }
 +
 +            @Override
 +            public StatementContext getContext() {
 +                return context;
 +            }
 +
 +            @Override
 +            public TableRef getTargetRef() {
 +                return targetTableRef;
 +            }
 +
 +            @Override
 +            public Set<TableRef> getSourceRefs() {
 +                // TODO dataPlan.getSourceRefs();
 +                return queryPlan.getSourceRefs();
 +            }
 +
 +            @Override
 +            public org.apache.phoenix.jdbc.PhoenixStatement.Operation 
getOperation() {
 +                return 
org.apache.phoenix.jdbc.PhoenixStatement.Operation.DELETE;
 +            }
 +
 +            @Override
 +            public MutationState execute() throws SQLException {
 +                ResultIterator iterator = queryPlan.iterator();
 +                try {
 +                    // TODO hasLimit??
 +                    return deleteRows(context, targetTableRef, 
deleteFromImmutableIndexToo ? queryPlan.getTableRef() : null, iterator, 
projector, queryPlan.getTableRef());
 +                } finally {
 +                    iterator.close();
 +                }
 +            }
 +
 +            @Override
 +            public ExplainPlan getExplainPlan() throws SQLException {
 +                List<String> queryPlanSteps =  
queryPlan.getExplainPlan().getPlanSteps();
 +                List<String> planSteps = 
Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
 +                planSteps.add("DELETE ROWS");
 +                planSteps.addAll(queryPlanSteps);
 +                return new ExplainPlan(planSteps);
 +            }
 +        };
 +    }
 +    
 +    private static MutationState deleteRows(StatementContext childContext, 
TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, 
RowProjector projector, TableRef sourceTableRef) throws SQLException {
 +        PTable table = targetTableRef.getTable();
 +        PhoenixStatement statement = childContext.getStatement();
 +        PhoenixConnection connection = statement.getConnection();
 +        PName tenantId = connection.getTenantId();
 +        byte[] tenantIdBytes = null;
 +        if (tenantId != null) {
 +            tenantIdBytes = 
ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != 
null, tenantId, table.getViewIndexId() != null);
 +        }
 +        final boolean isAutoCommit = connection.getAutoCommit();
 +        ConnectionQueryServices services = connection.getQueryServices();
 +        final int maxSize = 
services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
 +        final int batchSize = Math.min(connection.getMutateBatchSize(), 
maxSize);
 +        Map<ImmutableBytesPtr,RowMutationState> mutations = 
Maps.newHashMapWithExpectedSize(batchSize);
 +        Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
 +        // If indexTableRef is set, we're deleting the rows from both the 
index table and
 +        // the data table through a single query to save executing an 
additional one.
 +        if (indexTableRef != null) {
 +            indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
 +        }
 +        List<PColumn> pkColumns = table.getPKColumns();
 +        boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != 
null;
 +        boolean isSharedViewIndex = table.getViewIndexId() != null;
 +        int offset = (table.getBucketNum() == null ? 0 : 1);
 +        byte[][] values = new byte[pkColumns.size()][];
 +        if (isMultiTenant) {
 +            values[offset++] = tenantIdBytes;
 +        }
 +        if (isSharedViewIndex) {
 +            values[offset++] = 
MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
 +        }
 +        try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, 
childContext)) {
 +            int rowCount = 0;
 +            while (rs.next()) {
 +                ImmutableBytesPtr ptr = new ImmutableBytesPtr();  // allocate 
new as this is a key in a Map
 +                // Use tuple directly, as projector would not have all the PK 
columns from
 +                // our index table inside of our projection. Since the tables 
are equal,
 +                // there's no transation required.
 +                if (sourceTableRef.equals(targetTableRef)) {
 +                    rs.getCurrentRow().getKey(ptr);
 +                } else {
 +                    for (int i = offset; i < values.length; i++) {
 +                        byte[] byteValue = rs.getBytes(i+1-offset);
 +                        // The ResultSet.getBytes() call will have inverted 
it - we need to invert it back.
 +                        // TODO: consider going under the hood and just 
getting the bytes
 +                        if (pkColumns.get(i).getSortOrder() == 
SortOrder.DESC) {
 +                            byte[] tempByteValue = Arrays.copyOf(byteValue, 
byteValue.length);
 +                            byteValue = SortOrder.invert(byteValue, 0, 
tempByteValue, 0, byteValue.length);
 +                        }
 +                        values[i] = byteValue;
 +                    }
 +                    table.newKey(ptr, values);
 +                }
 +                // When issuing deletes, we do not care about the row time 
ranges. Also, if the table had a row timestamp column, then the
 +                // row key will already have its value. 
-                 mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO));
++                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
 +                if (indexTableRef != null) {
 +                    ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // 
allocate new as this is a key in a Map
 +                    rs.getCurrentRow().getKey(indexPtr);
-                     indexMutations.put(indexPtr, new 
RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO));
++                    indexMutations.put(indexPtr, new 
RowMutationState(PRow.DELETE_MARKER, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
 +                }
 +                if (mutations.size() > maxSize) {
 +                    throw new IllegalArgumentException("MutationState size of 
" + mutations.size() + " is bigger than max allowed size of " + maxSize);
 +                }
 +                rowCount++;
 +                // Commit a batch if auto commit is true and we're at our 
batch size
 +                if (isAutoCommit && rowCount % batchSize == 0) {
 +                    MutationState state = new MutationState(targetTableRef, 
mutations, 0, maxSize, connection);
 +                    connection.getMutationState().join(state);
 +                    if (indexTableRef != null) {
 +                        MutationState indexState = new 
MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
 +                        connection.getMutationState().join(indexState);
 +                    }
 +                    connection.getMutationState().send();
 +                    mutations.clear();
 +                    if (indexMutations != null) {
 +                        indexMutations.clear();
 +                    }
 +                }
 +            }
 +
 +            // If auto commit is true, this last batch will be committed upon 
return
 +            int nCommittedRows = rowCount / batchSize * batchSize;
 +            MutationState state = new MutationState(targetTableRef, 
mutations, nCommittedRows, maxSize, connection);
 +            if (indexTableRef != null) {
 +                // To prevent the counting of these index rows, we have a 
negative for remainingRows.
 +                MutationState indexState = new MutationState(indexTableRef, 
indexMutations, 0, maxSize, connection);
 +                state.join(indexState);
 +            }
 +            return state;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index c24ab57,4159be1..bdc4da9
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@@ -119,18 -119,17 +119,21 @@@ public class SequenceManager 
              return dstSequenceValues[index];
          }
      }
 -
 +    
      public SequenceValueExpression 
newSequenceReference(SequenceValueParseNode node) throws SQLException {
          PName tenantName = statement.getConnection().getTenantId();
 -        String tenantId = tenantName == null ? null : tenantName.getString();
          TableName tableName = node.getTableName();
 +        ParseNode numToAllocateNode = node.getNumToAllocateNode();        
 +        return newSequenceReference(tenantName, tableName, numToAllocateNode, 
node.getOp());
 +    }
 +    
 +    public SequenceValueExpression newSequenceReference(PName tenantName,
 +            TableName tableName, ParseNode numToAllocateNode, 
SequenceValueParseNode.Op op) throws SQLException {
 +        String tenantId = tenantName == null ? null : tenantName.getString();
+         if (tableName.getSchemaName() == null && 
statement.getConnection().getSchema() != null) {
+             tableName = 
TableName.create(statement.getConnection().getSchema(), 
tableName.getTableName());
+         }
          int nSaltBuckets = 
statement.getConnection().getQueryServices().getSequenceSaltBuckets();
 -        ParseNode numToAllocateNode = node.getNumToAllocateNode();
 -        
          long numToAllocate = determineNumToAllocate(tableName, 
numToAllocateNode);
          SequenceKey key = new SequenceKey(tenantId, 
tableName.getSchemaName(), tableName.getTableName(), nSaltBuckets);
          SequenceValueExpression expression = sequenceMap.get(key);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 5875c0a,25f3bec..45cf6bb
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@@ -142,19 -145,13 +145,17 @@@ public abstract class BaseResultIterato
      }
      
      protected boolean useStats() {
 +        return useStats(this.context);
 +    }
 +    
 +    private static boolean useStats(StatementContext context) {
-         Scan scan = context.getScan();
-         boolean isPointLookup = context.getScanRanges().isPointLookup();
          /*
           * Don't use guide posts:
           * 1) If we're collecting stats, as in this case we need to scan 
entire
           * regions worth of data to track where to put the guide posts.
           * 2) If the query is going to be executed serially.
           */
--        if (ScanUtil.isAnalyzeTable(scan)) {
++        if (ScanUtil.isAnalyzeTable(context.getScan())) {
              return false;
          }
          return true;
@@@ -421,55 -422,40 +426,39 @@@
          return guideIndex;
      }
  
 -    private GuidePostsInfo getGuidePosts() throws SQLException {
 -        if (!useStats() || 
!StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))) {
 +    private static GuidePostsInfo getGuidePosts(StatementContext context, 
PTable table, Set<byte[]> whereConditions, boolean useStats) throws 
SQLException {
-         if (!useStats) { return GuidePostsInfo.NO_GUIDEPOST; }
++        final byte[] physicalTableName = table.getPhysicalName().getBytes();
++        if (!useStats(context) || 
!StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))) {
+             return GuidePostsInfo.NO_GUIDEPOST;
+         }
  
-         GuidePostsInfo gps = null;
-         PTableStats tableStats = new 
MetaDataClient(context.getConnection()).getTableStats(table);
-         Map<byte[], GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
 -        TreeSet<byte[]> whereConditions = new 
TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+         for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
+             byte[] cf = where.getFirst();
+             if (cf != null) {
+                 whereConditions.add(cf);
+             }
+         }
 -        PTable table = getTable();
 -        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
 +        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
-         if (table.getColumnFamilies().isEmpty()) {
-             // For sure we can get the defaultCF from the table
-             gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF);
-         } else {
-             if (whereConditions.isEmpty() || 
whereConditions.contains(defaultCF)) {
-                 gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF);
-             } else {
-                 byte[] familyInWhere = whereConditions.iterator().next();
-                 GuidePostsInfo guidePostsInfo = 
guidePostMap.get(familyInWhere);
-                 if (guidePostsInfo != null) {
-                     gps = guidePostsInfo;
-                 } else {
-                     // As there are no guideposts collected for the where 
family we go with the default CF
-                     gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF);
+         byte[] cf = null;
+         if ( !table.getColumnFamilies().isEmpty() && 
!whereConditions.isEmpty() ) {
+             for(Pair<byte[], byte[]> where : 
context.getWhereConditionColumns()) {
+                 byte[] whereCF = where.getFirst();
+                 if (Bytes.compareTo(defaultCF, whereCF) == 0) {
+                     cf = defaultCF;
+                     break;
                  }
              }
+             if (cf == null) {
+                 cf = context.getWhereConditionColumns().get(0).getFirst();
+             }
          }
-         if (gps == null) { return GuidePostsInfo.NO_GUIDEPOST; }
-         return gps;
-     }
- 
-     private static GuidePostsInfo getDefaultFamilyGuidePosts(Map<byte[], 
GuidePostsInfo> guidePostMap, byte[] defaultCF) {
-         if (guidePostMap.get(defaultCF) != null) {
-                 return guidePostMap.get(defaultCF);
+         if (cf == null) {
+             cf = defaultCF;
          }
-         return null;
+         GuidePostsKey key = new GuidePostsKey(physicalTableName, cf);
+         return context.getConnection().getQueryServices().getTableStats(key);
      }
  
-     private static String toString(List<byte[]> gps) {
-         StringBuilder buf = new StringBuilder(gps.size() * 100);
-         buf.append("[");
-         for (int i = 0; i < gps.size(); i++) {
-             buf.append(Bytes.toStringBinary(gps.get(i)));
-             buf.append(",");
-             if (i > 0 && i < gps.size()-1 && (i % 10) == 0) {
-                 buf.append("\n");
-             }
-         }
-         buf.setCharAt(buf.length()-1, ']');
-         return buf.toString();
-     }
-     
      private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> 
scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary, 
HRegionLocation regionLocation) {
          boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, 
startKey, crossedRegionBoundary);
          if (scan != null) {
@@@ -683,130 -662,6 +672,130 @@@
          return parallelScans;
      }
  
 +
 +    /**
 +     * Compute the estimated count of rows and bytes that will be scanned.
 +     * @return the estimated row count and the byte count.
 +     * @throws SQLException
 +     */
 +    public static Pair<Long, Long> getEstimatedCount(StatementContext 
context, PTable table) throws SQLException {
 +        if (table.getName() == null) { // empty table
 +            return new Pair<Long, Long>(null, null);
 +        }
 +        
 +        if (context.getScanRanges().isPointLookup()) {
 +            return new Pair<Long, Long>(1L, 
SchemaUtil.estimateRowSize(table));
 +        }
 +        
 +        TreeSet<byte[]> whereConditions = new 
TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
 +        for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
 +            byte[] cf = where.getFirst();
 +            if (cf != null) {
 +                whereConditions.add(cf);
 +            }
 +        }
 +        GuidePostsInfo gps = getGuidePosts(context, table, whereConditions, 
useStats(context));
 +        if (gps == GuidePostsInfo.NO_GUIDEPOST) {
 +            return new Pair<Long, Long>(null, null);
 +        }
 +        
 +        byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
 +        byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
 +        Scan scan = context.getScan();
 +        List<HRegionLocation> regionLocations = 
context.getConnection().getQueryServices()
 +                .getAllTableRegions(table.getPhysicalName().getBytes());
 +        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
 +        ScanRanges scanRanges = context.getScanRanges();
 +        boolean isSalted = table.getBucketNum() != null;
 +        boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
 +        boolean traverseAllRegions = isSalted || isLocalIndex;
 +        if (!traverseAllRegions) {
 +            byte[] scanStartRow = scan.getStartRow();
 +            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, 
startKey) > 0) {
 +                startKey = scanStartRow;
 +            }
 +            byte[] scanStopRow = scan.getStopRow();
 +            if (stopKey.length == 0
 +                    || (scanStopRow.length != 0 && 
Bytes.compareTo(scanStopRow, stopKey) < 0)) {
 +                stopKey = scanStopRow;
 +            }
 +        }
 +        
 +        int regionIndex = 0;
 +        int stopIndex = regionBoundaries.size();
 +        if (startKey.length > 0) {
 +            regionIndex = getIndexContainingInclusive(regionBoundaries, 
startKey);
 +        }
 +        if (stopKey.length > 0) {
 +            stopIndex = Math.min(stopIndex, regionIndex + 
getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), 
stopKey));
 +            if (isLocalIndex) {
 +                stopKey = 
regionLocations.get(stopIndex).getRegionInfo().getEndKey();
 +            }
 +        }
 +        
 +        ImmutableBytesWritable currentKey = new 
ImmutableBytesWritable(startKey);
 +        
 +        int gpsSize = gps.getGuidePostsCount();
 +        int keyOffset = 0;
 +        ImmutableBytesWritable currentGuidePost = 
ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
 +        ImmutableBytesWritable guidePosts = gps.getGuidePosts();
 +        ByteArrayInputStream stream = null;
 +        DataInput input = null;
 +        PrefixByteDecoder decoder = null;
 +        int guideIndex = 0;
 +        long estimatedRows = 0;
 +        long estimatedSize = 0;
 +        try {
 +            if (gpsSize > 0) {
 +                stream = new ByteArrayInputStream(guidePosts.get(), 
guidePosts.getOffset(), guidePosts.getLength());
 +                input = new DataInputStream(stream);
 +                decoder = new PrefixByteDecoder(gps.getMaxLength());
 +                try {
 +                    while (currentKey.compareTo(currentGuidePost = 
PrefixByteCodec.decode(decoder, input)) >= 0
 +                            && currentKey.getLength() != 0) {
 +                        guideIndex++;
 +                    }
 +                } catch (EOFException e) {}
 +            }
 +            byte[] currentKeyBytes = currentKey.copyBytes();
 +    
 +            // Merge bisect with guideposts for all but the last region
 +            while (regionIndex <= stopIndex) {
 +                byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
 +                byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
 +                if (regionIndex == stopIndex) {
 +                    endKey = stopKey;
 +                } else {
 +                    endKey = regionBoundaries.get(regionIndex);
 +                }
 +                HRegionLocation regionLocation = 
regionLocations.get(regionIndex);
 +                if (isLocalIndex) {
 +                    HRegionInfo regionInfo = regionLocation.getRegionInfo();
 +                    endRegionKey = regionInfo.getEndKey();
 +                    keyOffset = 
ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
 +                }
 +                try {
 +                    while (guideIndex < gpsSize && 
(currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
 +                        Scan newScan = scanRanges.intersectScan(scan, 
currentKeyBytes, currentGuidePostBytes, keyOffset,
 +                                false);
 +                        if (newScan != null) {
-                             estimatedRows += 
gps.getRowCounts().get(guideIndex);
-                             estimatedSize += 
gps.getByteCounts().get(guideIndex);
++                            estimatedRows += gps.getRowCounts()[guideIndex];
++                            estimatedSize += gps.getByteCounts()[guideIndex];
 +                        }
 +                        currentKeyBytes = currentGuidePost.copyBytes();
 +                        currentGuidePost = PrefixByteCodec.decode(decoder, 
input);
 +                        currentGuidePostBytes = currentGuidePost.copyBytes();
 +                        guideIndex++;
 +                    }
 +                } catch (EOFException e) {}
 +                currentKeyBytes = endKey;
 +                regionIndex++;
 +            }
 +        } finally {
 +            if (stream != null) Closeables.closeQuietly(stream);
 +        }
 +        return new Pair<Long, Long>(estimatedRows, estimatedSize);
 +    }
     
      public static <T> List<T> reverseIfNecessary(List<T> list, boolean 
reverse) {
          if (!reverse) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
index d11fd12,0be7c16..e8b0593
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
@@@ -50,7 -59,21 +59,21 @@@ public class ColumnDef 
      private final String expressionStr;
      private final boolean isRowTimestamp;
  
+     public ColumnDef(ColumnDef def, String expressionStr) {
+         this.columnDefName = def.columnDefName;
+         this.dataType = def.dataType;
+         this.isNull = def.isNull;
+         this.maxLength = def.maxLength;
+         this.scale = def.scale;
+         this.isPK = def.isPK;
+         this.sortOrder = def.sortOrder;
+         this.isArray = def.isArray;
+         this.arrSize = def.arrSize;
+         this.isRowTimestamp = def.isRowTimestamp;
+         this.expressionStr = expressionStr;
+     }
+ 
 -    ColumnDef(ColumnName columnDefName, String sqlTypeName, boolean isArray, 
Integer arrSize, Boolean isNull, Integer maxLength,
 +    public ColumnDef(ColumnName columnDefName, String sqlTypeName, boolean 
isArray, Integer arrSize, Boolean isNull, Integer maxLength,
              Integer scale, boolean isPK, SortOrder sortOrder, String 
expressionStr, boolean isRowTimestamp) {
          try {
              Preconditions.checkNotNull(sortOrder);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
index 512a1d2,07b412f..9ed4d1c
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
@@@ -124,11 -133,13 +133,13 @@@ public class StatisticsUtil 
          return key;
      }
  
 -    public static List<Result> readStatistics(HTableInterface statsHTable, 
byte[] tableNameBytes, ImmutableBytesPtr cf,
 +    public static List<Result> readStatisticsForDelete(HTableInterface 
statsHTable, byte[] tableNameBytes, ImmutableBytesPtr cf,
              byte[] startKey, byte[] stopKey, long clientTimeStamp) throws 
IOException {
          List<Result> statsForRegion = new ArrayList<Result>();
-         Scan s = MetaDataUtil.newTableRowsScan(getAdjustedKey(startKey, 
tableNameBytes, cf, false),
-                 getAdjustedKey(stopKey, tableNameBytes, cf, true), 
MetaDataProtocol.MIN_TABLE_TIMESTAMP,
+         Scan s = MetaDataUtil.newTableRowsScan(
+                 getAdjustedKey(startKey, tableNameBytes, cf, false),
+                 getAdjustedKey(stopKey, tableNameBytes, cf, true), 
+                 MetaDataProtocol.MIN_TABLE_TIMESTAMP,
                  clientTimeStamp);
          s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES);
          ResultScanner scanner = null;
@@@ -144,26 -155,16 +155,39 @@@
              }
          }
          return statsForRegion;
 +    }
 +
-     public static PTableStats readStatistics(HTableInterface statsHTable, 
byte[] tableNameBytes, long clientTimeStamp)
-             throws IOException {
-         return readStatistics(statsHTable, tableNameBytes, null, null, null, 
clientTimeStamp);
++    public static List<Result> readStatistics(HTableInterface statsHTable, 
byte[] tableNameBytes, ImmutableBytesPtr cf,
++            byte[] startKey, byte[] stopKey, long clientTimeStamp) throws 
IOException {
++        List<Result> statsForRegion = new ArrayList<Result>();
++        Scan s = MetaDataUtil.newTableRowsScan(
++                getAdjustedKey(startKey, tableNameBytes, cf, false),
++                getAdjustedKey(stopKey, tableNameBytes, cf, true), 
++                MetaDataProtocol.MIN_TABLE_TIMESTAMP,
++                clientTimeStamp);
++        ResultScanner scanner = null;
++        try {
++            scanner = statsHTable.getScanner(s);
++            Result result = null;
++            while ((result = scanner.next()) != null) {
++                statsForRegion.add(result);
++            }
++        } finally {
++            if (scanner != null) {
++                scanner.close();
++            }
++        }
++        return statsForRegion;
      }
  
-     public static PTableStats readStatistics(HTableInterface statsHTable,
-             byte[] tableNameBytes, ImmutableBytesPtr cf, byte[] startKey, 
byte[] stopKey,
-             long clientTimeStamp)
+     public static GuidePostsInfo readStatistics(HTableInterface statsHTable, 
GuidePostsKey key, long clientTimeStamp)
              throws IOException {
          ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-         Scan s;
-         if (cf == null) {
-             s = MetaDataUtil.newTableRowsScan(tableNameBytes, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp);
-         } else {
-             s = MetaDataUtil.newTableRowsScan(getAdjustedKey(startKey, 
tableNameBytes, cf, false),
-                     getAdjustedKey(stopKey, tableNameBytes, cf, true), 
MetaDataProtocol.MIN_TABLE_TIMESTAMP,
-                     clientTimeStamp);
-         }
+         ptr.set(key.getColumnFamily());
+         byte[] tableNameBytes = key.getPhysicalName();
+         byte[] startKey = getStartKey(tableNameBytes, ptr);
+         byte[] endKey = getEndKey(tableNameBytes, ptr);
+         Scan s = MetaDataUtil.newTableRowsScan(startKey, endKey, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP, clientTimeStamp);
          s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH_BYTES);
          s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT_BYTES);
          s.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, 
QueryConstants.EMPTY_COLUMN_BYTES);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
diff --cc 
phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
index 186370a,62aafa5..a009b5c
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
@@@ -269,28 -268,4 +269,32 @@@ public class CorrelatePlanTest 
              throw new RuntimeException(e);
          }        
      }
 +    
 +    private static class CorrelateVariableImpl implements CorrelateVariable {
 +        private final TableRef tableRef;
 +        private Tuple value;
 +        
 +        CorrelateVariableImpl(TableRef tableRef) {
 +            this.tableRef = tableRef;
 +        }
 +
 +        @Override
 +        public Expression newExpression(int index) {
-             return new ColumnRef(tableRef, index).newColumnExpression();
++            try {
++                return new ColumnRef(tableRef, index).newColumnExpression();
++            } catch (SQLException e) {
++                throw new RuntimeException(e);
++            }
 +        }
 +
 +        @Override
 +        public Tuple getValue() {
 +            return value;
 +        }
 +
 +        @Override
 +        public void setValue(Tuple value) {
 +            this.value = value;            
 +        }        
 +    }
  }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c8c33a91/pom.xml
----------------------------------------------------------------------

Reply via email to