PHOENIX-2128 Implement using local index with missing columns

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aa0bf583
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aa0bf583
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aa0bf583

Branch: refs/heads/calcite
Commit: aa0bf5834656909e6814204992b6fec19a166b02
Parents: c9f117e
Author: maryannxue <maryann....@gmail.com>
Authored: Sun Apr 3 13:40:16 2016 -0400
Committer: maryannxue <maryann....@gmail.com>
Committed: Sun Apr 3 13:40:16 2016 -0400

----------------------------------------------------------------------
 .../phoenix/calcite/CalciteLocalIndexIT.java    |  18 +
 .../apache/phoenix/calcite/CalciteUtils.java    |  16 +-
 .../phoenix/calcite/CorrelateVariableImpl.java  |  30 ++
 .../apache/phoenix/calcite/PhoenixSchema.java   |   6 +-
 .../apache/phoenix/calcite/PhoenixTable.java    |  64 ++--
 .../apache/phoenix/calcite/TableMapping.java    | 355 +++++++++++++++++++
 .../calcite/jdbc/PhoenixPrepareImpl.java        |   2 +
 .../calcite/rel/PhoenixAbstractAggregate.java   |   6 +-
 .../calcite/rel/PhoenixAbstractProject.java     |   6 +-
 .../calcite/rel/PhoenixClientAggregate.java     |   2 +-
 .../phoenix/calcite/rel/PhoenixClientJoin.java  |  10 +-
 .../calcite/rel/PhoenixClientSemiJoin.java      |   6 +-
 .../phoenix/calcite/rel/PhoenixClientSort.java  |   2 +-
 .../phoenix/calcite/rel/PhoenixCorrelate.java   |  13 +-
 .../phoenix/calcite/rel/PhoenixLimit.java       |   2 +-
 .../apache/phoenix/calcite/rel/PhoenixRel.java  |  11 +-
 .../calcite/rel/PhoenixRelImplementorImpl.java  |  75 +---
 .../phoenix/calcite/rel/PhoenixServerJoin.java  |   8 +-
 .../calcite/rel/PhoenixServerSemiJoin.java      |   8 +-
 .../phoenix/calcite/rel/PhoenixTableScan.java   |  61 ++--
 .../rel/PhoenixToEnumerableConverter.java       |   2 +-
 .../phoenix/calcite/rel/PhoenixUncollect.java   |   6 +-
 .../phoenix/calcite/rel/PhoenixValues.java      |   8 +-
 .../rules/PhoenixFilterScanMergeRule.java       |   3 +-
 .../rules/PhoenixForwardTableScanRule.java      |   3 +-
 .../rules/PhoenixReverseTableScanRule.java      |   3 +-
 .../rules/PhoenixTableScanColumnRefRule.java    |  50 +++
 .../apache/phoenix/execute/CorrelatePlan.java   |   2 +-
 .../apache/phoenix/execute/RuntimeContext.java  |  17 +-
 .../phoenix/execute/RuntimeContextImpl.java     |  57 +--
 .../apache/phoenix/execute/TupleProjector.java  |  12 +-
 .../CorrelateVariableFieldAccessExpression.java |   2 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |   6 +
 .../phoenix/calcite/ToExpressionTest.java       |  11 +-
 .../phoenix/execute/CorrelatePlanTest.java      |  27 +-
 35 files changed, 659 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java
index 90bfc3b..02cf2a1 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java
@@ -239,6 +239,24 @@ public class CalciteLocalIndexIT extends 
BaseCalciteIndexIT {
                         {"1000", 1001},
                         {"1001", 1002}})*/
                 .close();
+        
+        start(props).sql("select * from " + MULTI_TENANT_VIEW1 + " where col0 
= 1000")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixServerProject(ID=[$1], 
COL0=[CAST($0):INTEGER], COL1=[$2], COL2=[$3])\n" +
+                           "    PhoenixTableScan(table=[[phoenix, S1, 
IDX_MULTITENANT_TEST_VIEW1]], filter=[=(CAST($0):INTEGER, 1000)], 
extendedColumns=[{2, 3}])\n")
+                .sameResultAsPhoenixStandalone(0)
+                /*.resultIs(0, new Object[][] {
+                        {"0999", 1000, 1001, 1002}})*/
+                .close();
+        
+        start(props).sql("select id, col0, col2 from " + MULTI_TENANT_VIEW1 + 
" where col0 = 1000")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixServerProject(ID=[$1], 
COL0=[CAST($0):INTEGER], COL2=[$3])\n" +
+                           "    PhoenixTableScan(table=[[phoenix, S1, 
IDX_MULTITENANT_TEST_VIEW1]], filter=[=(CAST($0):INTEGER, 1000)], 
extendedColumns=[{3}])\n")
+                .sameResultAsPhoenixStandalone(0)
+                /*.resultIs(0, new Object[][] {
+                        {"0999", 1000, 1002}})*/
+                .close();
 
         props.setProperty("TenantId", "20");
         start(props).sql("select * from " + MULTI_TENANT_VIEW2 + " where id = 
'0765'")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index 15df943..b41db0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -231,7 +231,7 @@ public class CalciteUtils {
                        public Expression newExpression(RexNode node, 
Implementor implementor) {
                                ImmutableBytesWritable ptr = new 
ImmutableBytesWritable();
                                try {
-                                       return 
ComparisonExpression.create(CompareOp.EQUAL, convertChildren((RexCall) node, 
implementor), ptr, 
implementor.getTableRef().getTable().rowKeyOrderOptimizable());
+                                       return 
ComparisonExpression.create(CompareOp.EQUAL, convertChildren((RexCall) node, 
implementor), ptr, 
implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
                                } catch (SQLException e) {
                                        throw new RuntimeException(e);
                                }
@@ -244,7 +244,7 @@ public class CalciteUtils {
             public Expression newExpression(RexNode node, Implementor 
implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
-                    return ComparisonExpression.create(CompareOp.NOT_EQUAL, 
convertChildren((RexCall) node, implementor), ptr, 
implementor.getTableRef().getTable().rowKeyOrderOptimizable());
+                    return ComparisonExpression.create(CompareOp.NOT_EQUAL, 
convertChildren((RexCall) node, implementor), ptr, 
implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
                 } catch (SQLException e) {
                     throw new RuntimeException(e);
                 }
@@ -257,7 +257,7 @@ public class CalciteUtils {
             public Expression newExpression(RexNode node, Implementor 
implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
-                    return ComparisonExpression.create(CompareOp.GREATER, 
convertChildren((RexCall) node, implementor), ptr, 
implementor.getTableRef().getTable().rowKeyOrderOptimizable());
+                    return ComparisonExpression.create(CompareOp.GREATER, 
convertChildren((RexCall) node, implementor), ptr, 
implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
                 } catch (SQLException e) {
                     throw new RuntimeException(e);
                 }
@@ -270,7 +270,7 @@ public class CalciteUtils {
             public Expression newExpression(RexNode node, Implementor 
implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
-                    return 
ComparisonExpression.create(CompareOp.GREATER_OR_EQUAL, 
convertChildren((RexCall) node, implementor), ptr, 
implementor.getTableRef().getTable().rowKeyOrderOptimizable());
+                    return 
ComparisonExpression.create(CompareOp.GREATER_OR_EQUAL, 
convertChildren((RexCall) node, implementor), ptr, 
implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
                 } catch (SQLException e) {
                     throw new RuntimeException(e);
                 }
@@ -283,7 +283,7 @@ public class CalciteUtils {
             public Expression newExpression(RexNode node, Implementor 
implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
-                    return ComparisonExpression.create(CompareOp.LESS, 
convertChildren((RexCall) node, implementor), ptr, 
implementor.getTableRef().getTable().rowKeyOrderOptimizable());
+                    return ComparisonExpression.create(CompareOp.LESS, 
convertChildren((RexCall) node, implementor), ptr, 
implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
                 } catch (SQLException e) {
                     throw new RuntimeException(e);
                 }
@@ -296,7 +296,7 @@ public class CalciteUtils {
             public Expression newExpression(RexNode node, Implementor 
implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
-                    return 
ComparisonExpression.create(CompareOp.LESS_OR_EQUAL, convertChildren((RexCall) 
node, implementor), ptr, 
implementor.getTableRef().getTable().rowKeyOrderOptimizable());
+                    return 
ComparisonExpression.create(CompareOp.LESS_OR_EQUAL, convertChildren((RexCall) 
node, implementor), ptr, 
implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
                 } catch (SQLException e) {
                     throw new RuntimeException(e);
                 }
@@ -871,10 +871,10 @@ public class CalciteUtils {
         PDataType fromDataType = childExpr.getDataType();
         
         Expression expr = childExpr;
-        if(fromDataType != null && 
implementor.getTableRef().getTable().getType() != PTableType.INDEX) {
+        if(fromDataType != null && 
implementor.getTableMapping().getPTable().getType() != PTableType.INDEX) {
             expr =  convertToRoundExpressionIfNeeded(fromDataType, 
targetDataType, childExpr);
         }
-        return CoerceExpression.create(expr, targetDataType, 
SortOrder.getDefault(), expr.getMaxLength(), 
implementor.getTableRef().getTable().rowKeyOrderOptimizable());
+        return CoerceExpression.create(expr, targetDataType, 
SortOrder.getDefault(), expr.getMaxLength(), 
implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
     }
     
     @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/CorrelateVariableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CorrelateVariableImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CorrelateVariableImpl.java
new file mode 100644
index 0000000..bde7ddc
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CorrelateVariableImpl.java
@@ -0,0 +1,30 @@
+package org.apache.phoenix.calcite;
+
+import org.apache.phoenix.execute.RuntimeContext.CorrelateVariable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class CorrelateVariableImpl implements CorrelateVariable {
+    private final TableMapping tableMapping;
+    private Tuple value;
+    
+    public CorrelateVariableImpl(TableMapping tableMapping) {
+        this.tableMapping = tableMapping;
+    }
+
+    @Override
+    public Expression newExpression(int index) {
+        return tableMapping.newColumnExpression(index);
+    }
+
+    @Override
+    public Tuple getValue() {
+        return value;
+    }
+
+    @Override
+    public void setValue(Tuple value) {
+        this.value = value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
index 5e26017..683a16b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java
@@ -226,7 +226,7 @@ public class PhoenixSchema implements Schema {
         try {
             for (Table table : tables.values()) {
                 if (table instanceof PhoenixTable) {
-                    PTable pTable = ((PhoenixTable) table).pTable;
+                    PTable pTable = ((PhoenixTable) 
table).tableMapping.getPTable();
                     for (PTable index : pTable.getIndexes()) {
                         addMaterialization(index, path, calciteSchema);
                     }
@@ -247,9 +247,11 @@ public class PhoenixSchema implements Schema {
     private void addMaterialization(PTable index, List<String> path,
             CalciteSchema calciteSchema) throws SQLException {
         index = fixTableMultiTenancy(index);
+        PhoenixTable table = new PhoenixTable(pc, index);
+        tables.put(index.getTableName().getString(), table);
         StringBuffer sb = new StringBuffer();
         sb.append("SELECT");
-        for (PColumn column : PhoenixTable.getMappedColumns(index)) {
+        for (PColumn column : table.getColumns()) {
             String indexColumnName = column.getName().getString();
             String dataColumnName = 
IndexUtil.getDataColumnName(indexColumnName);
             
sb.append(",").append(SchemaUtil.getEscapedFullColumnName(dataColumnName));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
index 1fcc9d3..7b5d287 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
@@ -2,7 +2,6 @@ package org.apache.phoenix.calcite;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.calcite.plan.RelOptTable;
@@ -33,15 +32,18 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.iterate.BaseResultIterators;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -52,47 +54,32 @@ import com.google.common.collect.Lists;
  * Phoenix.
  */
 public class PhoenixTable extends AbstractTable implements TranslatableTable {
-  public final PTable pTable;
-  public final List<PColumn> mappedColumns;
+  public final TableMapping tableMapping;
   public final ImmutableBitSet pkBitSet;
   public final RelCollation collation;
   public final long byteCount;
   public final long rowCount;
   public final PhoenixConnection pc;
-  
-  public static List<PColumn> getMappedColumns(PTable pTable) {
-      if (pTable.getBucketNum() == null
-              && !pTable.isMultiTenant()
-              && pTable.getViewIndexId() == null) {
-          return pTable.getColumns();
-      }
-      
-      List<PColumn> columns = Lists.newArrayList(pTable.getColumns());
-      if (pTable.getViewIndexId() != null) {
-          for (Iterator<PColumn> iter = columns.iterator(); iter.hasNext();) {
-              if 
(iter.next().getName().getString().equals(MetaDataUtil.VIEW_INDEX_ID_COLUMN_NAME))
 {
-                  iter.remove();
-                  break;
-              }
-          }
-      }
-      if (pTable.isMultiTenant()) {
-          columns.remove(pTable.getBucketNum() == null ? 0 : 1);
-      }
-      if (pTable.getBucketNum() != null) {
-          columns.remove(0);
-      }
-      return columns;
-  }
 
-  public PhoenixTable(PhoenixConnection pc, PTable pTable) {
+  public PhoenixTable(PhoenixConnection pc, PTable pTable) throws SQLException 
{
       this.pc = Preconditions.checkNotNull(pc);
-      this.pTable = Preconditions.checkNotNull(pTable);
-      this.mappedColumns = getMappedColumns(pTable);
+      PTable extendedTable = null;
+      if (pTable.getIndexType() == IndexType.LOCAL) {
+          ColumnResolver x = FromCompiler.getResolver(
+                  NamedTableNode.create(null,
+                          
TableName.create(pTable.getParentSchemaName().getString(),
+                                  pTable.getParentTableName().getString()),
+                          ImmutableList.<ColumnDef>of()), pc);
+          final List<TableRef> tables = x.getTables();
+          assert tables.size() == 1;
+          extendedTable = tables.get(0).getTable();          
+      }
+      this.tableMapping = extendedTable == null ? new TableMapping(pTable) : 
new TableMapping(pTable, extendedTable);
       List<Integer> pkPositions = Lists.<Integer> newArrayList();
       List<RelFieldCollation> fieldCollations = Lists.<RelFieldCollation> 
newArrayList();
-      for (int i = 0; i < mappedColumns.size(); i++) {
-          PColumn column = mappedColumns.get(i);
+      final List<PColumn> columns = tableMapping.getMappedColumns();
+      for (int i = 0; i < columns.size(); i++) {
+          PColumn column = columns.get(i);
           if (SchemaUtil.isPKColumn(column)) {
               SortOrder sortOrder = column.getSortOrder();
               pkPositions.add(i);
@@ -133,16 +120,17 @@ public class PhoenixTable extends AbstractTable 
implements TranslatableTable {
       }
     }
     
-    public PTable getTable() {
-       return pTable;
+    public List<PColumn> getColumns() {
+        return tableMapping.getMappedColumns();
     }
 
     @SuppressWarnings("rawtypes")
     @Override
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
         final RelDataTypeFactory.FieldInfoBuilder builder = 
typeFactory.builder();
-        for (int i = 0; i < mappedColumns.size(); i++) {
-            PColumn pColumn = mappedColumns.get(i);
+        final List<PColumn> columns = tableMapping.getMappedColumns();
+        for (int i = 0; i < columns.size(); i++) {
+            PColumn pColumn = columns.get(i);
             final PDataType baseType = 
                     pColumn.getDataType().isArrayType() ?
                             
PDataType.fromTypeId(pColumn.getDataType().getSqlType() - 
PDataType.ARRAY_TYPE_BASE) 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
new file mode 100644
index 0000000..839370d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java
@@ -0,0 +1,355 @@
+package org.apache.phoenix.calcite;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptUtil.InputFinder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.ExpressionProjector;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.TupleProjectionCompiler;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.ProjectedColumn;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class TableMapping {
+    private final TableRef tableRef;
+    private final List<PColumn> mappedColumns;
+    private final int extendedColumnsOffset;
+    private final TableRef extendedTableRef;
+
+    public TableMapping(PTable table) {
+        this.tableRef = new TableRef(table);
+        this.mappedColumns = getMappedColumns(table);
+        this.extendedColumnsOffset = mappedColumns.size();
+        this.extendedTableRef = null;
+    }
+
+    public TableMapping(PTable table, PTable dataTable) throws SQLException {
+        this.tableRef = new TableRef(table);
+        this.mappedColumns = Lists.newArrayList();
+        this.mappedColumns.addAll(getMappedColumns(table));
+        this.extendedColumnsOffset = mappedColumns.size();
+        Set<String> names = Sets.newHashSet();
+        for (PColumn column : this.mappedColumns) {
+            names.add(column.getName().getString());
+        }
+        TableRef dataTableRef = new TableRef(dataTable);
+        List<PColumn> projectedColumns = new ArrayList<PColumn>();
+        for (PColumn sourceColumn : dataTable.getColumns()) {
+            if (!SchemaUtil.isPKColumn(sourceColumn)) {
+                String colName = IndexUtil.getIndexColumnName(sourceColumn);
+                if (!names.contains(colName)) {
+                    ColumnRef sourceColumnRef =
+                            new ColumnRef(dataTableRef, 
sourceColumn.getPosition());
+                    PColumn column = new 
ProjectedColumn(PNameFactory.newName(colName),
+                            sourceColumn.getFamilyName(), 
projectedColumns.size(),
+                            sourceColumn.isNullable(), sourceColumnRef);
+                    projectedColumns.add(column);
+                }
+            }            
+        }
+        this.mappedColumns.addAll(projectedColumns);
+        PTable extendedTable = PTableImpl.makePTable(dataTable.getTenantId(),
+                TupleProjectionCompiler.PROJECTED_TABLE_SCHEMA, 
dataTable.getName(),
+                PTableType.PROJECTED, null, dataTable.getTimeStamp(),
+                dataTable.getSequenceNumber(), dataTable.getPKName(), null,
+                projectedColumns, null, null, Collections.<PTable>emptyList(),
+                dataTable.isImmutableRows(), Collections.<PName>emptyList(), 
null, null,
+                dataTable.isWALDisabled(), false, dataTable.getStoreNulls(),
+                dataTable.getViewType(), null, null, 
dataTable.rowKeyOrderOptimizable(),
+                dataTable.isTransactional(), 
dataTable.getUpdateCacheFrequency(),
+                dataTable.getIndexDisableTimestamp());
+        this.extendedTableRef = new TableRef(extendedTable);
+    }
+    
+    public TableRef getTableRef() {
+        return tableRef;
+    }
+    
+    public PTable getPTable() {
+        return tableRef.getTable();
+    }
+    
+    public List<PColumn> getMappedColumns() {
+        return mappedColumns;
+    }
+    
+    public boolean hasExtendedColumns() {
+        return extendedTableRef != null;
+    }
+    
+    public ColumnExpression newColumnExpression(int index) {
+        ColumnRef colRef = new ColumnRef(
+                index < extendedColumnsOffset ? tableRef : extendedTableRef,
+                this.mappedColumns.get(index).getPosition());
+        return colRef.newColumnExpression();
+    }
+    
+    public ImmutableBitSet getDefaultExtendedColumnRef() {
+        return ImmutableBitSet.range(extendedColumnsOffset, 
mappedColumns.size());
+    }
+    
+    public ImmutableBitSet getExtendedColumnRef(List<RexNode> exprs) {
+        if (!hasExtendedColumns()) {
+            return ImmutableBitSet.of();
+        }
+        
+        ImmutableBitSet.Builder builder = ImmutableBitSet.builder();
+        for (RexNode expr : exprs) {
+            builder.addAll(InputFinder.analyze(expr).inputBitSet.build());
+        }
+        for (int i = 0; i < extendedColumnsOffset; i++) {
+            builder.clear(i);
+        }
+        return builder.build();
+    }
+   
+    public Pair<Integer, Integer> 
getExtendedColumnReferenceCount(ImmutableBitSet columnRef) {
+        Set<String> cf = Sets.newHashSet();
+        int columnCount = 0;
+        for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) {
+            if (columnRef.get(i)) {
+                PColumn dataColumn = ((ProjectedColumn) mappedColumns.get(i))
+                        .getSourceColumnRef().getColumn();
+                cf.add(dataColumn.getFamilyName().getString());
+                columnCount++;
+            }
+        }
+        return new Pair<Integer, Integer>(cf.size(), columnCount);
+    }
+    
+    public PTable createProjectedTable(boolean retainPKColumns) {
+        List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
+        List<PColumn> columns = retainPKColumns ?
+                  tableRef.getTable().getColumns() : mappedColumns.subList(0, 
extendedColumnsOffset);
+        for (PColumn column : columns) {
+            sourceColumnRefs.add(new ColumnRef(tableRef, 
column.getPosition()));
+        }
+        if (extendedColumnsOffset < mappedColumns.size()) {
+            for (PColumn column : mappedColumns.subList(extendedColumnsOffset, 
mappedColumns.size())) {
+                sourceColumnRefs.add(new ColumnRef(extendedTableRef, 
column.getPosition()));
+            }
+        }
+        
+        try {
+            return TupleProjectionCompiler.createProjectedTable(tableRef, 
sourceColumnRefs, retainPKColumns);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    public TupleProjector createTupleProjector(boolean retainPKColumns) {
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        List<Expression> exprs = Lists.<Expression> newArrayList();
+        for (int i = 0; i < mappedColumns.size(); i++) {
+            if (!SchemaUtil.isPKColumn(mappedColumns.get(i)) || 
!retainPKColumns) {
+                Expression expr = newColumnExpression(i);
+                exprs.add(expr);
+                builder.addField(expr);
+            }
+        }
+        
+        return new TupleProjector(builder.build(), exprs.toArray(new 
Expression[exprs.size()]));
+    }
+    
+    public RowProjector createRowProjector() {
+        List<ColumnProjector> columnProjectors = 
Lists.<ColumnProjector>newArrayList();
+        for (int i = 0; i < mappedColumns.size(); i++) {
+            PColumn column = mappedColumns.get(i);
+            Expression expr = newColumnExpression(i); // Do not use 
column.position() here.
+            columnProjectors.add(new 
ExpressionProjector(column.getName().getString(), 
tableRef.getTable().getName().getString(), expr, false));
+        }
+        // TODO get estimate row size
+        return new RowProjector(columnProjectors, 0, false);        
+    }
+    
+    public void setupScanForExtendedTable(Scan scan, ImmutableBitSet 
extendedColumnRef,
+            PhoenixConnection connection) throws SQLException {
+        if (extendedTableRef == null || extendedColumnRef.isEmpty()) {
+            return;
+        }
+        
+        TableRef dataTableRef = null;
+        List<PColumn> dataColumns = Lists.newArrayList();
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        List<Expression> exprs = Lists.<Expression> newArrayList();
+        for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) {
+            ProjectedColumn column = (ProjectedColumn) mappedColumns.get(i);
+            builder.addField(column);
+            if (extendedColumnRef.get(i)) {
+                dataColumns.add(column.getSourceColumnRef().getColumn());
+                exprs.add(column.getSourceColumnRef().newColumnExpression());
+                if (dataTableRef == null) {
+                    dataTableRef = column.getSourceColumnRef().getTableRef();
+                }
+            } else {
+                exprs.add(LiteralExpression.newConstant(null));
+            }
+        }
+        if (dataColumns.isEmpty()) {
+            return;
+        }
+        
+        // Set data columns to be join back from data table.
+        serializeDataTableColumnsToJoin(scan, dataColumns);
+        // Set tuple projector of the data columns.
+        TupleProjector projector = new TupleProjector(builder.build(), 
exprs.toArray(new Expression[exprs.size()]));
+        TupleProjector.serializeProjectorIntoScan(scan, projector, 
IndexUtil.INDEX_PROJECTOR);
+        PTable dataTable = dataTableRef.getTable();
+        // Set index maintainer of the local index.
+        serializeIndexMaintainerIntoScan(scan, dataTable, connection);
+        // Set view constants if exists.
+        serializeViewConstantsIntoScan(scan, dataTable);
+    }
+
+    private static void serializeDataTableColumnsToJoin(Scan scan, 
List<PColumn> dataColumns) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, dataColumns.size());
+            for (PColumn column : dataColumns) {
+                Bytes.writeByteArray(output, 
column.getFamilyName().getBytes());
+                Bytes.writeByteArray(output, column.getName().getBytes());
+            }
+            
scan.setAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN, 
stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable, 
PhoenixConnection connection) throws SQLException {
+        PName name = getPTable().getName();
+        List<PTable> indexes = Lists.newArrayListWithExpectedSize(1);
+        for (PTable index : dataTable.getIndexes()) {
+            if (index.getName().equals(name) && index.getIndexType() == 
IndexType.LOCAL) {
+                indexes.add(index);
+                break;
+            }
+        }
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        IndexMaintainer.serialize(dataTable, ptr, indexes, connection);
+        scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+        if (dataTable.isTransactional()) {
+            scan.setAttribute(BaseScannerRegionObserver.TX_STATE, 
connection.getMutationState().encodeTransaction());
+        }
+    }
+
+    private static void serializeViewConstantsIntoScan(Scan scan, PTable 
dataTable) {
+        int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + 
(dataTable.isMultiTenant() ? 1 : 0);
+        int nViewConstants = 0;
+        if (dataTable.getType() == PTableType.VIEW) {
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            List<PColumn> dataPkColumns = dataTable.getPKColumns();
+            for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
+                PColumn dataPKColumn = dataPkColumns.get(i);
+                if (dataPKColumn.getViewConstant() != null) {
+                    nViewConstants++;
+                }
+            }
+            if (nViewConstants > 0) {
+                byte[][] viewConstants = new byte[nViewConstants][];
+                int j = 0;
+                for (int i = dataPosOffset; i < dataPkColumns.size(); i++) {
+                    PColumn dataPkColumn = dataPkColumns.get(i);
+                    if (dataPkColumn.getViewConstant() != null) {
+                        if (IndexUtil.getViewConstantValue(dataPkColumn, ptr)) 
{
+                            viewConstants[j++] = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
+                        } else {
+                            throw new IllegalStateException();
+                        }
+                    }
+                }
+                serializeViewConstantsIntoScan(viewConstants, scan);
+            }
+        }
+    }
+
+    private static void serializeViewConstantsIntoScan(byte[][] viewConstants, 
Scan scan) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            WritableUtils.writeVInt(output, viewConstants.length);
+            for (byte[] viewConstant : viewConstants) {
+                Bytes.writeByteArray(output, viewConstant);
+            }
+            scan.setAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS, 
stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    private static List<PColumn> getMappedColumns(PTable pTable) {
+        if (pTable.getBucketNum() == null
+                && !pTable.isMultiTenant()
+                && pTable.getViewIndexId() == null) {
+            return pTable.getColumns();
+        }
+        
+        List<PColumn> columns = Lists.newArrayList(pTable.getColumns());
+        if (pTable.getViewIndexId() != null) {
+            for (Iterator<PColumn> iter = columns.iterator(); iter.hasNext();) 
{
+                if 
(iter.next().getName().getString().equals(MetaDataUtil.VIEW_INDEX_ID_COLUMN_NAME))
 {
+                    iter.remove();
+                    break;
+                }
+            }
+        }
+        if (pTable.isMultiTenant()) {
+            columns.remove(pTable.getBucketNum() == null ? 0 : 1);
+        }
+        if (pTable.getBucketNum() != null) {
+            columns.remove(0);
+        }
+        
+        return columns;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
index ae09b42..7863523 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java
@@ -38,6 +38,7 @@ import 
org.apache.phoenix.calcite.rules.PhoenixMergeSortUnionRule;
 import org.apache.phoenix.calcite.rules.PhoenixOrderedAggregateRule;
 import org.apache.phoenix.calcite.rules.PhoenixReverseTableScanRule;
 import org.apache.phoenix.calcite.rules.PhoenixSortServerJoinTransposeRule;
+import org.apache.phoenix.calcite.rules.PhoenixTableScanColumnRefRule;
 
 import com.google.common.base.Function;
 
@@ -93,6 +94,7 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl {
             planner.addRule(rule);
         }
         planner.addRule(PhoenixFilterScanMergeRule.INSTANCE);
+        planner.addRule(PhoenixTableScanColumnRefRule.INSTANCE);
         planner.addRule(PhoenixCompactClientSortRule.INSTANCE);
         planner.addRule(PhoenixJoinSingleValueAggregateMergeRule.INSTANCE);
         planner.addRule(PhoenixMergeSortUnionRule.INSTANCE);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
index 16c97cc..dff5d4e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
@@ -18,6 +18,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
@@ -32,7 +33,6 @@ import 
org.apache.phoenix.expression.function.AggregateFunction;
 import org.apache.phoenix.expression.function.SingleAggregateFunction;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
-import org.apache.phoenix.schema.TableRef;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -193,8 +193,8 @@ abstract public class PhoenixAbstractAggregate extends 
Aggregate implements Phoe
         }
         
         TupleProjector tupleProjector = implementor.project(exprs);
-        PTable projectedTable = implementor.createProjectedTable();
-        implementor.setTableRef(new TableRef(projectedTable));
+        PTable projectedTable = 
implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns);
+        implementor.setTableMapping(new TableMapping(projectedTable));
         return new TupleProjectionPlan(plan, tupleProjector, null);
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
index 464109d..6e09fa5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
@@ -15,10 +15,10 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 
 import com.google.common.collect.Lists;
 
@@ -55,8 +55,8 @@ abstract public class PhoenixAbstractProject extends Project 
implements PhoenixR
             exprs.add(CalciteUtils.toExpression(project, implementor));
         }
         TupleProjector tupleProjector = implementor.project(exprs);
-        PTable projectedTable = implementor.createProjectedTable();
-        implementor.setTableRef(new TableRef(projectedTable));
+        PTable projectedTable = 
implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns);
+        implementor.setTableMapping(new TableMapping(projectedTable));
 
         return tupleProjector;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
index 5aaff2f..680e871 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
@@ -63,7 +63,7 @@ public class PhoenixClientAggregate extends 
PhoenixAbstractAggregate {
         QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
         implementor.popContext();
         
-        TableRef tableRef = implementor.getTableRef();
+        TableRef tableRef = implementor.getTableMapping().getTableRef();
         PhoenixStatement stmt = plan.getContext().getStatement();
         StatementContext context;
         try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
index 2df5f74..53c0cf4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@ -19,6 +19,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
@@ -115,12 +116,12 @@ public class PhoenixClientJoin extends 
PhoenixAbstractJoin {
 
         implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().retainPKColumns && 
getJoinType() != JoinRelType.FULL, true, getColumnRefList(0)));
         QueryPlan leftPlan = implementInput(implementor, 0, leftExprs);
-        PTable leftTable = implementor.getTableRef().getTable();
+        PTable leftTable = implementor.getTableMapping().getPTable();
         implementor.popContext();
 
         implementor.pushContext(new ImplementorContext(false, true, 
getColumnRefList(1)));
         QueryPlan rightPlan = implementInput(implementor, 1, rightExprs);
-        PTable rightTable = implementor.getTableRef().getTable();
+        PTable rightTable = implementor.getTableMapping().getPTable();
         implementor.popContext();
         
         JoinType type = CalciteUtils.convertJoinType(getJoinType());
@@ -130,8 +131,9 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin {
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
-        TableRef tableRef = new TableRef(joinedTable);
-        implementor.setTableRef(tableRef);
+        TableMapping tableMapping = new TableMapping(joinedTable);
+        implementor.setTableMapping(tableMapping);
+        TableRef tableRef = tableMapping.getTableRef();
         ColumnResolver resolver;
         try {
             resolver = FromCompiler.getResolver(tableRef);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
index 31dfc3d..cec1181 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
@@ -16,6 +16,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.QueryPlan;
@@ -99,7 +100,8 @@ public class PhoenixClientSemiJoin extends 
PhoenixAbstractSemiJoin implements
 
         implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().retainPKColumns && 
getJoinType() != JoinRelType.FULL, true, getColumnRefList(0)));
         QueryPlan leftPlan = implementInput(implementor, 0, leftExprs);
-        TableRef joinedTable = implementor.getTableRef();
+        TableMapping tableMapping = implementor.getTableMapping();
+        TableRef joinedTable = tableMapping.getTableRef();
         implementor.popContext();
 
         implementor.pushContext(new ImplementorContext(false, true, 
getColumnRefList(1)));
@@ -107,7 +109,7 @@ public class PhoenixClientSemiJoin extends 
PhoenixAbstractSemiJoin implements
         implementor.popContext();
         
         JoinType type = JoinType.Semi;
-        implementor.setTableRef(joinedTable);
+        implementor.setTableMapping(tableMapping);
         PhoenixStatement stmt = leftPlan.getContext().getStatement();
         ColumnResolver resolver = leftPlan.getContext().getResolver();
         StatementContext context = new StatementContext(stmt, resolver, new 
Scan(), new SequenceManager(stmt));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
index 59fb639..7c73530 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
@@ -59,7 +59,7 @@ public class PhoenixClientSort extends PhoenixAbstractSort {
             
         QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
         
-        TableRef tableRef = implementor.getTableRef();
+        TableRef tableRef = implementor.getTableMapping().getTableRef();
         PhoenixStatement stmt = plan.getContext().getStatement();
         StatementContext context;
         try {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
index 496dfdb..d70c5be 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
@@ -17,13 +17,14 @@ import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.CorrelateVariableImpl;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
 import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.CorrelatePlan;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 
 import com.google.common.base.Supplier;
 
@@ -73,14 +74,14 @@ public class PhoenixCorrelate extends Correlate implements 
PhoenixRel {
     public QueryPlan implement(Implementor implementor) {
         implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().retainPKColumns, true, 
ImmutableIntList.identity(getLeft().getRowType().getFieldCount())));
         QueryPlan leftPlan = implementor.visitInput(0, (PhoenixRel) getLeft());
-        PTable leftTable = implementor.getTableRef().getTable();
+        PTable leftTable = implementor.getTableMapping().getPTable();
         implementor.popContext();
 
-        
implementor.getRuntimeContext().defineCorrelateVariable(getCorrelVariable(), 
implementor.getTableRef());
+        
implementor.getRuntimeContext().defineCorrelateVariable(getCorrelVariable(), 
new CorrelateVariableImpl(implementor.getTableMapping()));
 
         implementor.pushContext(new ImplementorContext(false, true, 
ImmutableIntList.identity(getRight().getRowType().getFieldCount())));
         QueryPlan rightPlan = implementor.visitInput(1, (PhoenixRel) 
getRight());
-        PTable rightTable = implementor.getTableRef().getTable();
+        PTable rightTable = implementor.getTableMapping().getPTable();
         implementor.popContext();
                 
         JoinType type = CalciteUtils.convertSemiJoinType(getJoinType());
@@ -90,8 +91,8 @@ public class PhoenixCorrelate extends Correlate implements 
PhoenixRel {
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
-        TableRef tableRef = new TableRef(joinedTable);
-        implementor.setTableRef(tableRef);
+        TableMapping tableMapping = new TableMapping(joinedTable);
+        implementor.setTableMapping(tableMapping);
 
         return new CorrelatePlan(leftPlan, rightPlan, getCorrelVariable(), 
                 type, false, implementor.getRuntimeContext(), joinedTable, 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
index 3edcf60..236af84 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
@@ -89,7 +89,7 @@ public class PhoenixLimit extends SingleRel implements 
PhoenixRel {
         }
         
         return new ClientScanPlan(plan.getContext(), plan.getStatement(), 
-                implementor.getTableRef(), RowProjector.EMPTY_PROJECTOR, 
+                implementor.getTableMapping().getTableRef(), 
RowProjector.EMPTY_PROJECTOR, 
                 fetchValue, null, OrderBy.EMPTY_ORDER_BY, plan);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
index f6b0e97..5d8b7b0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -6,9 +6,9 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.PhoenixSequence;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.SequenceValueExpression;
 import org.apache.phoenix.execute.RuntimeContext;
@@ -16,8 +16,6 @@ import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.ColumnExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.parse.SequenceValueParseNode;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
 
 /**
@@ -75,15 +73,12 @@ public interface PhoenixRel extends RelNode {
     Expression newFieldAccessExpression(String variableId, int index, 
PDataType type);
     SequenceValueExpression newSequenceExpression(PhoenixSequence seq, 
SequenceValueParseNode.Op op);
     RuntimeContext getRuntimeContext();
-    void setTableRef(TableRef tableRef);
-    TableRef getTableRef();
+    void setTableMapping(TableMapping tableMapping);
+    TableMapping getTableMapping();
     void setSequenceManager(SequenceManager sequenceManager);
     void pushContext(ImplementorContext context);
     ImplementorContext popContext();
     ImplementorContext getCurrentContext();
-    PTable createProjectedTable();
-    TupleProjector createTupleProjector();
-    RowProjector createRowProjector();
     TupleProjector project(List<Expression> exprs);
   }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
index a33cd18..759f09b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
@@ -5,18 +5,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Stack;
 
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.calcite.PhoenixSequence;
-import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext;
-import org.apache.phoenix.compile.ColumnProjector;
-import org.apache.phoenix.compile.ExpressionProjector;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.SequenceValueExpression;
-import org.apache.phoenix.compile.TupleProjectionCompiler;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.execute.RuntimeContext;
 import org.apache.phoenix.execute.TupleProjector;
@@ -26,7 +20,6 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SequenceValueParseNode;
 import org.apache.phoenix.parse.TableName;
-import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
@@ -35,19 +28,14 @@ import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.util.SchemaUtil;
-
 import com.google.common.collect.Lists;
 
 public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
     private final RuntimeContext runtimeContext;
-       private TableRef tableRef;
-       private List<PColumn> mappedColumns;
        private Stack<ImplementorContext> contextStack;
        private SequenceManager sequenceManager;
+       private TableMapping tableMapping;
        
        public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) {
            this.runtimeContext = runtimeContext;
@@ -61,14 +49,13 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
 
        @Override
        public ColumnExpression newColumnExpression(int index) {
-               ColumnRef colRef = new ColumnRef(this.tableRef, 
this.mappedColumns.get(index).getPosition());
-               return colRef.newColumnExpression();
+               return tableMapping.newColumnExpression(index);
        }
     
     @SuppressWarnings("rawtypes")
     @Override
     public Expression newFieldAccessExpression(String variableId, int index, 
PDataType type) {
-        Expression fieldAccessExpr = 
runtimeContext.newCorrelateVariableReference(variableId, index);
+        Expression fieldAccessExpr = 
runtimeContext.getCorrelateVariable(variableId).newExpression(index);
         return new CorrelateVariableFieldAccessExpression(runtimeContext, 
variableId, fieldAccessExpr);
     }
     
@@ -89,14 +76,13 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
     }
 
     @Override
-       public void setTableRef(TableRef tableRef) {
-               this.tableRef = tableRef;
-               this.mappedColumns = 
PhoenixTable.getMappedColumns(tableRef.getTable());
+       public void setTableMapping(TableMapping tableMapping) {
+               this.tableMapping = tableMapping;
        }
     
     @Override
-    public TableRef getTableRef() {
-        return this.tableRef;
+    public TableMapping getTableMapping() {
+        return this.tableMapping;
     }
     
     @Override
@@ -120,49 +106,6 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
     }
     
     @Override
-    public PTable createProjectedTable() {
-        List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
-        List<PColumn> columns = getCurrentContext().retainPKColumns ?
-                  getTableRef().getTable().getColumns() : mappedColumns;
-        for (PColumn column : columns) {
-            sourceColumnRefs.add(new ColumnRef(getTableRef(), 
column.getPosition()));
-        }
-        
-        try {
-            return TupleProjectionCompiler.createProjectedTable(getTableRef(), 
sourceColumnRefs, getCurrentContext().retainPKColumns);
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    @Override
-    public TupleProjector createTupleProjector() {
-        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
-        List<Expression> exprs = Lists.<Expression> newArrayList();
-        for (PColumn column : mappedColumns) {
-            if (!SchemaUtil.isPKColumn(column) || 
!getCurrentContext().retainPKColumns) {
-                Expression expr = new ColumnRef(tableRef, 
column.getPosition()).newColumnExpression();
-                exprs.add(expr);
-                builder.addField(expr);                
-            }
-        }
-        
-        return new TupleProjector(builder.build(), exprs.toArray(new 
Expression[exprs.size()]));
-    }
-    
-    @Override
-    public RowProjector createRowProjector() {
-        List<ColumnProjector> columnProjectors = 
Lists.<ColumnProjector>newArrayList();
-        for (int i = 0; i < mappedColumns.size(); i++) {
-            PColumn column = mappedColumns.get(i);
-            Expression expr = newColumnExpression(i); // Do not use 
column.position() here.
-            columnProjectors.add(new 
ExpressionProjector(column.getName().getString(), 
getTableRef().getTable().getName().getString(), expr, false));
-        }
-        // TODO get estimate row size
-        return new RowProjector(columnProjectors, 0, false);        
-    }
-    
-    @Override
     public TupleProjector project(List<Expression> exprs) {
         KeyValueSchema.KeyValueSchemaBuilder builder = new 
KeyValueSchema.KeyValueSchemaBuilder(0);
         List<PColumn> columns = Lists.<PColumn>newArrayList();
@@ -180,7 +123,7 @@ public class PhoenixRelImplementorImpl implements 
PhoenixRel.Implementor {
                     null, null, columns, null, null, 
Collections.<PTable>emptyList(),
                     false, Collections.<PName>emptyList(), null, null, false, 
false, false, null,
                     null, null, true, false, 0, 0);
-            this.setTableRef(new TableRef(CalciteUtils.createTempAlias(), 
pTable, HConstants.LATEST_TIMESTAMP, false));
+            this.setTableMapping(new TableMapping(pTable));
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
index 2f09a9d..8690377 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -17,6 +17,7 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
 import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryPlan;
@@ -28,7 +29,6 @@ import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
@@ -111,12 +111,12 @@ public class PhoenixServerJoin extends 
PhoenixAbstractJoin {
 
         implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().retainPKColumns, true, 
getColumnRefList(0)));
         QueryPlan leftPlan = implementInput(implementor, 0, null);
-        PTable leftTable = implementor.getTableRef().getTable();
+        PTable leftTable = implementor.getTableMapping().getPTable();
         implementor.popContext();
 
         implementor.pushContext(new ImplementorContext(false, true, 
getColumnRefList(1)));
         QueryPlan rightPlan = implementInput(implementor, 1, rightExprs);
-        PTable rightTable = implementor.getTableRef().getTable();
+        PTable rightTable = implementor.getTableMapping().getPTable();
         implementor.popContext();
         
         JoinType type = CalciteUtils.convertJoinType(getJoinType());
@@ -126,7 +126,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin {
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
-        implementor.setTableRef(new TableRef(joinedTable));
+        implementor.setTableMapping(new TableMapping(joinedTable));
         
         // Compile left conditions against the joined table due to 
implementation of HashJoinRegionScanner.
         for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); 
iter.hasNext();) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
index 74c3e4d..2b133aa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
@@ -15,6 +15,7 @@ import org.apache.calcite.rel.core.SemiJoin;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
@@ -24,7 +25,6 @@ import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
@@ -98,7 +98,7 @@ public class PhoenixServerSemiJoin extends 
PhoenixAbstractSemiJoin {
 
         implementor.pushContext(new 
ImplementorContext(implementor.getCurrentContext().retainPKColumns, true, 
getColumnRefList(0)));
         QueryPlan leftPlan = implementInput(implementor, 0, leftExprs);
-        TableRef joinedTable = implementor.getTableRef();
+        TableMapping tableMapping = implementor.getTableMapping();
         implementor.popContext();
 
         implementor.pushContext(new ImplementorContext(false, true, 
getColumnRefList(1)));
@@ -106,10 +106,10 @@ public class PhoenixServerSemiJoin extends 
PhoenixAbstractSemiJoin {
         implementor.popContext();
         
         JoinType type = JoinType.Semi;
-        implementor.setTableRef(joinedTable);
+        implementor.setTableMapping(tableMapping);
         @SuppressWarnings("unchecked")
         HashJoinInfo hashJoinInfo = new HashJoinInfo(
-                joinedTable.getTable(), 
+                tableMapping.getPTable(), 
                 new ImmutableBytesPtr[] {new ImmutableBytesPtr()}, 
                 (List<Expression>[]) (new List[] {leftExprs}), 
                 new JoinType[] {type}, new boolean[] {true}, 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index 32c7a90..54b1f27 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -18,10 +18,11 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -45,7 +46,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
@@ -64,17 +64,18 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
     public final RexNode filter;
     public final ScanOrder scanOrder;
     public final ScanRanges scanRanges;
+    public final ImmutableBitSet extendedColumnRef;
     
     protected final Long estimatedBytes;
     protected final float rowCountFactor;
     
     public static PhoenixTableScan create(RelOptCluster cluster, final 
RelOptTable table) {
         return create(cluster, table, null,
-                getDefaultScanOrder(table.unwrap(PhoenixTable.class)));
+                getDefaultScanOrder(table.unwrap(PhoenixTable.class)), null);
     }
 
     public static PhoenixTableScan create(RelOptCluster cluster, final 
RelOptTable table, 
-            RexNode filter, final ScanOrder scanOrder) {
+            RexNode filter, final ScanOrder scanOrder, ImmutableBitSet 
extendedColumnRef) {
         final RelTraitSet traits =
                 cluster.traitSetOf(PhoenixConvention.SERVER)
                 .replaceIfs(RelCollationTraitDef.INSTANCE,
@@ -87,25 +88,34 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
                         return scanOrder == ScanOrder.FORWARD ? collations : 
reverse(collations);
                     }
                 });
-        return new PhoenixTableScan(cluster, traits, table, filter, scanOrder);
+        return new PhoenixTableScan(cluster, traits, table, filter, scanOrder, 
extendedColumnRef);
     }
 
-    private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, 
RelOptTable table, RexNode filter, ScanOrder scanOrder) {
+    private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits,
+            RelOptTable table, RexNode filter, ScanOrder scanOrder,
+            ImmutableBitSet extendedColumnRef) {
         super(cluster, traits, table);
         this.filter = filter;
         this.scanOrder = scanOrder;
         final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
         this.rowCountFactor = phoenixTable.pc.getQueryServices()
-                .getProps().getFloat(PhoenixRel.ROW_COUNT_FACTOR, 1f);        
+                .getProps().getFloat(PhoenixRel.ROW_COUNT_FACTOR, 1f);
         try {
             // TODO simplify this code
-            PTable pTable = phoenixTable.getTable();
-            TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), 
pTable, HConstants.LATEST_TIMESTAMP, false);
+            TableMapping tableMapping = phoenixTable.tableMapping;
+            PTable pTable = tableMapping.getPTable();
             SelectStatement select = SelectStatement.SELECT_ONE;
             PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
-            ColumnResolver resolver = FromCompiler.getResolver(tableRef);
+            ColumnResolver resolver = 
FromCompiler.getResolver(tableMapping.getTableRef());
             StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
-            if (filter != null) {
+            if (extendedColumnRef == null) {
+                extendedColumnRef = tableMapping.getDefaultExtendedColumnRef();
+            }
+            if (filter == null) {
+                this.extendedColumnRef = extendedColumnRef;
+            } else {
+                this.extendedColumnRef = extendedColumnRef.union(
+                        
tableMapping.getExtendedColumnRef(ImmutableList.of(filter)));
                 // We use a implementor with a special implementation for 
field access
                 // here, which translates RexFieldAccess into a 
LiteralExpression
                 // with a sample value. This will achieve 3 goals at a time:
@@ -124,12 +134,13 @@ public class PhoenixTableScan extends TableScan 
implements PhoenixRel {
                         }
                     }                    
                 };
-                tmpImplementor.setTableRef(tableRef);
+                tmpImplementor.setTableMapping(tableMapping);
                 Expression filterExpr = CalciteUtils.toExpression(filter, 
tmpImplementor);
                 filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, 
select, filterExpr);
                 WhereCompiler.setScanFilter(context, select, filterExpr, true, 
false);
             }        
             this.scanRanges = context.getScanRanges();
+            // TODO Get estimated byte count based on column reference list.
             this.estimatedBytes = 
BaseResultIterators.getEstimatedCount(context, pTable).getSecond();
         } catch (SQLException e) {
             throw new RuntimeException(e);
@@ -170,7 +181,8 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
     public RelWriter explainTerms(RelWriter pw) {
         return super.explainTerms(pw)
             .itemIf("filter", filter, filter != null)
-            .itemIf("scanOrder", scanOrder, scanOrder != ScanOrder.NONE);
+            .itemIf("scanOrder", scanOrder, scanOrder != ScanOrder.NONE)
+            .itemIf("extendedColumns", extendedColumnRef, 
!extendedColumnRef.isEmpty());
     }
 
     @Override
@@ -188,6 +200,10 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
                 byteCount = phoenixTable.byteCount;
             }
         }
+        Pair<Integer, Integer> columnRefCount =
+                
phoenixTable.tableMapping.getExtendedColumnReferenceCount(extendedColumnRef);
+        double extendedColumnMultiplier = 1 + columnRefCount.getFirst() * 10 + 
columnRefCount.getSecond() * 0.1;
+        byteCount *= extendedColumnMultiplier;
         byteCount *= rowCountFactor;
         if (scanOrder != ScanOrder.NONE) {
             // We don't want to make a big difference here. The idea is to 
avoid
@@ -230,12 +246,11 @@ public class PhoenixTableScan extends TableScan 
implements PhoenixRel {
     @Override
     public QueryPlan implement(Implementor implementor) {
         final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
-        PTable pTable = phoenixTable.getTable();
-        TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), 
pTable, HConstants.LATEST_TIMESTAMP, false);
-        implementor.setTableRef(tableRef);
+        TableMapping tableMapping = phoenixTable.tableMapping;
+        implementor.setTableMapping(tableMapping);
         try {
             PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc);
-            ColumnResolver resolver = FromCompiler.getResolver(tableRef);
+            ColumnResolver resolver = 
FromCompiler.getResolver(tableMapping.getTableRef());
             StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
             SelectStatement select = SelectStatement.SELECT_ONE;
             ImmutableIntList columnRefList = 
implementor.getCurrentContext().columnRefList;
@@ -255,12 +270,14 @@ public class PhoenixTableScan extends TableScan 
implements PhoenixRel {
             if (filter != null && 
!context.getScanRanges().equals(this.scanRanges)) {
                 dynamicFilter = filterExpr;
             }
-            projectColumnFamilies(context.getScan(), 
phoenixTable.mappedColumns, columnRefList);
+            tableMapping.setupScanForExtendedTable(context.getScan(), 
extendedColumnRef, context.getConnection());
+            projectColumnFamilies(context.getScan(), 
tableMapping.getMappedColumns(), columnRefList);
             if (implementor.getCurrentContext().forceProject) {
-                TupleProjector tupleProjector = 
implementor.createTupleProjector();
+                boolean retainPKColumns = 
implementor.getCurrentContext().retainPKColumns;
+                TupleProjector tupleProjector = 
tableMapping.createTupleProjector(retainPKColumns);
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), 
tupleProjector);
-                PTable projectedTable = implementor.createProjectedTable();
-                implementor.setTableRef(new TableRef(projectedTable));
+                PTable projectedTable = 
tableMapping.createProjectedTable(retainPKColumns);
+                implementor.setTableMapping(new TableMapping(projectedTable));
             }
             Integer limit = null;
             OrderBy orderBy = scanOrder == ScanOrder.NONE ?
@@ -269,7 +286,7 @@ public class PhoenixTableScan extends TableScan implements 
PhoenixRel {
                               OrderBy.FWD_ROW_KEY_ORDER_BY
                             : OrderBy.REV_ROW_KEY_ORDER_BY);
             ParallelIteratorFactory iteratorFactory = null;
-            return new ScanPlan(context, select, tableRef, 
RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, 
dynamicFilter);
+            return new ScanPlan(context, select, tableMapping.getTableRef(), 
RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, 
dynamicFilter);
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
index 3d0aa22..677baa5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
@@ -104,7 +104,7 @@ public class PhoenixToEnumerableConverter extends 
ConverterImpl implements Enume
             }
             @Override
             public RowProjector getProjector() {
-                return phoenixImplementor.createRowProjector();
+                return 
phoenixImplementor.getTableMapping().createRowProjector();
             }
             @Override
             public ResultIterator iterator(ParallelScanGrouper scanGrouper)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
index 0d64868..3076890 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
@@ -10,12 +10,12 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.UnnestArrayPlan;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PDataType;
 
 public class PhoenixUncollect extends Uncollect implements PhoenixRel {
@@ -56,8 +56,8 @@ public class PhoenixUncollect extends Uncollect implements 
PhoenixRel {
         } catch (SQLException e) {
             throw new RuntimeException(e);
         }
-        PTable projectedTable = implementor.createProjectedTable();
-        implementor.setTableRef(new TableRef(projectedTable));
+        PTable projectedTable = 
implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns);
+        implementor.setTableMapping(new TableMapping(projectedTable));
         return new UnnestArrayPlan(plan, arrayExpression, false);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
index 9af3f52..83947ae 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexLiteral;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
@@ -119,12 +120,13 @@ public class PhoenixValues extends Values implements 
PhoenixRel {
             TupleProjector projector = implementor.project(exprs);
             literalResult.add(projector.projectResults(baseTuple));
         }
-        PTable projectedTable = implementor.createProjectedTable();
-        implementor.setTableRef(new TableRef(projectedTable));
+        PTable projectedTable = 
implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns);
+        TableMapping tableMapping = new TableMapping(projectedTable);
+        implementor.setTableMapping(tableMapping);
         
         try {
             PhoenixStatement stmt = new PhoenixStatement(phoenixConnection);
-            ColumnResolver resolver = 
FromCompiler.getResolver(implementor.getTableRef());
+            ColumnResolver resolver = 
FromCompiler.getResolver(tableMapping.getTableRef());
             StatementContext context = new StatementContext(stmt, resolver, 
new Scan(), new SequenceManager(stmt));
             return new LiteralResultIterationPlan(literalResult, context, 
SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, 
RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
         } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
index a6f3a59..e932fad 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java
@@ -44,6 +44,7 @@ public class PhoenixFilterScanMergeRule extends RelOptRule {
         PhoenixTableScan scan = call.rel(1);
         assert scan.filter == null : "predicate should have ensured no filter";
         call.transformTo(PhoenixTableScan.create(
-                scan.getCluster(), scan.getTable(), filter.getCondition(), 
scan.scanOrder));
+                scan.getCluster(), scan.getTable(), filter.getCondition(),
+                scan.scanOrder, scan.extendedColumnRef));
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java
index b38324c..b369e48 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java
@@ -43,7 +43,8 @@ public class PhoenixForwardTableScanRule extends RelOptRule {
         for (RelCollation candidate : scan.getTable().getCollationList()) {
             if (candidate.satisfies(collation)) {
                 RelNode newRel = PhoenixTableScan.create(
-                        scan.getCluster(), scan.getTable(), scan.filter, 
ScanOrder.FORWARD);
+                        scan.getCluster(), scan.getTable(), scan.filter,
+                        ScanOrder.FORWARD, scan.extendedColumnRef);
                 if (sort.offset != null || sort.fetch != null) {
                     newRel = sort.copy(
                             sort.getTraitSet().replace(RelCollations.EMPTY),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java
index 0a6f000..715df8b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java
@@ -47,7 +47,8 @@ public class PhoenixReverseTableScanRule extends RelOptRule {
         for (RelCollation candidate : scan.getTable().getCollationList()) {
             if (CalciteUtils.reverseCollation(candidate).satisfies(collation)) 
{
                 RelNode newRel = PhoenixTableScan.create(
-                        scan.getCluster(), scan.getTable(), scan.filter, 
ScanOrder.REVERSE);
+                        scan.getCluster(), scan.getTable(), scan.filter,
+                        ScanOrder.REVERSE, scan.extendedColumnRef);
                 if (sort.offset != null || sort.fetch != null) {
                     newRel = sort.copy(
                             sort.getTraitSet().replace(RelCollations.EMPTY),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixTableScanColumnRefRule.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixTableScanColumnRefRule.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixTableScanColumnRefRule.java
new file mode 100644
index 0000000..9076593
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixTableScanColumnRefRule.java
@@ -0,0 +1,50 @@
+package org.apache.phoenix.calcite.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.calcite.rel.PhoenixTableScan;
+import com.google.common.base.Predicate;
+
+public class PhoenixTableScanColumnRefRule extends RelOptRule {
+
+    /** Predicate that returns true if a table scan has extended columns. */
+    private static final Predicate<PhoenixTableScan> APPLICABLE_TABLE_SCAN =
+            new Predicate<PhoenixTableScan>() {
+        @Override
+        public boolean apply(PhoenixTableScan phoenixTableScan) {
+            return phoenixTableScan.getTable()
+                    
.unwrap(PhoenixTable.class).tableMapping.hasExtendedColumns();
+        }
+    };
+
+    public static final PhoenixTableScanColumnRefRule INSTANCE = new 
PhoenixTableScanColumnRefRule();
+
+    private PhoenixTableScanColumnRefRule() {
+        super(
+            operand(Project.class,
+                operand(PhoenixTableScan.class, null, APPLICABLE_TABLE_SCAN, 
any())));
+    }
+
+    @Override
+    public void onMatch(RelOptRuleCall call) {
+        Project project = call.rel(0);
+        PhoenixTableScan scan = call.rel(1);
+        ImmutableBitSet bitSet = scan.getTable().unwrap(PhoenixTable.class)
+                .tableMapping.getExtendedColumnRef(project.getProjects());
+        if (bitSet.contains(scan.extendedColumnRef)) {
+            return;
+        }
+        
+        call.transformTo(
+                project.copy(
+                        project.getTraitSet(),
+                        PhoenixTableScan.create(
+                                scan.getCluster(), scan.getTable(),
+                                scan.filter, scan.scanOrder, bitSet),
+                        project.getProjects(),
+                        project.getRowType()));
+    }
+}

Reply via email to