PHOENIX-2128 Implement using local index with missing columns
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aa0bf583 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aa0bf583 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aa0bf583 Branch: refs/heads/calcite Commit: aa0bf5834656909e6814204992b6fec19a166b02 Parents: c9f117e Author: maryannxue <maryann....@gmail.com> Authored: Sun Apr 3 13:40:16 2016 -0400 Committer: maryannxue <maryann....@gmail.com> Committed: Sun Apr 3 13:40:16 2016 -0400 ---------------------------------------------------------------------- .../phoenix/calcite/CalciteLocalIndexIT.java | 18 + .../apache/phoenix/calcite/CalciteUtils.java | 16 +- .../phoenix/calcite/CorrelateVariableImpl.java | 30 ++ .../apache/phoenix/calcite/PhoenixSchema.java | 6 +- .../apache/phoenix/calcite/PhoenixTable.java | 64 ++-- .../apache/phoenix/calcite/TableMapping.java | 355 +++++++++++++++++++ .../calcite/jdbc/PhoenixPrepareImpl.java | 2 + .../calcite/rel/PhoenixAbstractAggregate.java | 6 +- .../calcite/rel/PhoenixAbstractProject.java | 6 +- .../calcite/rel/PhoenixClientAggregate.java | 2 +- .../phoenix/calcite/rel/PhoenixClientJoin.java | 10 +- .../calcite/rel/PhoenixClientSemiJoin.java | 6 +- .../phoenix/calcite/rel/PhoenixClientSort.java | 2 +- .../phoenix/calcite/rel/PhoenixCorrelate.java | 13 +- .../phoenix/calcite/rel/PhoenixLimit.java | 2 +- .../apache/phoenix/calcite/rel/PhoenixRel.java | 11 +- .../calcite/rel/PhoenixRelImplementorImpl.java | 75 +--- .../phoenix/calcite/rel/PhoenixServerJoin.java | 8 +- .../calcite/rel/PhoenixServerSemiJoin.java | 8 +- .../phoenix/calcite/rel/PhoenixTableScan.java | 61 ++-- .../rel/PhoenixToEnumerableConverter.java | 2 +- .../phoenix/calcite/rel/PhoenixUncollect.java | 6 +- .../phoenix/calcite/rel/PhoenixValues.java | 8 +- .../rules/PhoenixFilterScanMergeRule.java | 3 +- .../rules/PhoenixForwardTableScanRule.java | 3 +- .../rules/PhoenixReverseTableScanRule.java | 3 +- .../rules/PhoenixTableScanColumnRefRule.java | 50 +++ .../apache/phoenix/execute/CorrelatePlan.java | 2 +- .../apache/phoenix/execute/RuntimeContext.java | 17 +- .../phoenix/execute/RuntimeContextImpl.java | 57 +-- .../apache/phoenix/execute/TupleProjector.java | 12 +- .../CorrelateVariableFieldAccessExpression.java | 2 +- .../java/org/apache/phoenix/util/IndexUtil.java | 6 + .../phoenix/calcite/ToExpressionTest.java | 11 +- .../phoenix/execute/CorrelatePlanTest.java | 27 +- 35 files changed, 659 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java index 90bfc3b..02cf2a1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java @@ -239,6 +239,24 @@ public class CalciteLocalIndexIT extends BaseCalciteIndexIT { {"1000", 1001}, {"1001", 1002}})*/ .close(); + + start(props).sql("select * from " + MULTI_TENANT_VIEW1 + " where col0 = 1000") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER], COL1=[$2], COL2=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[=(CAST($0):INTEGER, 1000)], extendedColumns=[{2, 3}])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][] { + {"0999", 1000, 1001, 1002}})*/ + .close(); + + start(props).sql("select id, col0, col2 from " + MULTI_TENANT_VIEW1 + " where col0 = 1000") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER], COL2=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[=(CAST($0):INTEGER, 1000)], extendedColumns=[{3}])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][] { + {"0999", 1000, 1002}})*/ + .close(); props.setProperty("TenantId", "20"); start(props).sql("select * from " + MULTI_TENANT_VIEW2 + " where id = '0765'") http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java index 15df943..b41db0e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java @@ -231,7 +231,7 @@ public class CalciteUtils { public Expression newExpression(RexNode node, Implementor implementor) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); try { - return ComparisonExpression.create(CompareOp.EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableRef().getTable().rowKeyOrderOptimizable()); + return ComparisonExpression.create(CompareOp.EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable()); } catch (SQLException e) { throw new RuntimeException(e); } @@ -244,7 +244,7 @@ public class CalciteUtils { public Expression newExpression(RexNode node, Implementor implementor) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); try { - return ComparisonExpression.create(CompareOp.NOT_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableRef().getTable().rowKeyOrderOptimizable()); + return ComparisonExpression.create(CompareOp.NOT_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable()); } catch (SQLException e) { throw new RuntimeException(e); } @@ -257,7 +257,7 @@ public class CalciteUtils { public Expression newExpression(RexNode node, Implementor implementor) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); try { - return ComparisonExpression.create(CompareOp.GREATER, convertChildren((RexCall) node, implementor), ptr, implementor.getTableRef().getTable().rowKeyOrderOptimizable()); + return ComparisonExpression.create(CompareOp.GREATER, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable()); } catch (SQLException e) { throw new RuntimeException(e); } @@ -270,7 +270,7 @@ public class CalciteUtils { public Expression newExpression(RexNode node, Implementor implementor) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); try { - return ComparisonExpression.create(CompareOp.GREATER_OR_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableRef().getTable().rowKeyOrderOptimizable()); + return ComparisonExpression.create(CompareOp.GREATER_OR_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable()); } catch (SQLException e) { throw new RuntimeException(e); } @@ -283,7 +283,7 @@ public class CalciteUtils { public Expression newExpression(RexNode node, Implementor implementor) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); try { - return ComparisonExpression.create(CompareOp.LESS, convertChildren((RexCall) node, implementor), ptr, implementor.getTableRef().getTable().rowKeyOrderOptimizable()); + return ComparisonExpression.create(CompareOp.LESS, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable()); } catch (SQLException e) { throw new RuntimeException(e); } @@ -296,7 +296,7 @@ public class CalciteUtils { public Expression newExpression(RexNode node, Implementor implementor) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); try { - return ComparisonExpression.create(CompareOp.LESS_OR_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableRef().getTable().rowKeyOrderOptimizable()); + return ComparisonExpression.create(CompareOp.LESS_OR_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable()); } catch (SQLException e) { throw new RuntimeException(e); } @@ -871,10 +871,10 @@ public class CalciteUtils { PDataType fromDataType = childExpr.getDataType(); Expression expr = childExpr; - if(fromDataType != null && implementor.getTableRef().getTable().getType() != PTableType.INDEX) { + if(fromDataType != null && implementor.getTableMapping().getPTable().getType() != PTableType.INDEX) { expr = convertToRoundExpressionIfNeeded(fromDataType, targetDataType, childExpr); } - return CoerceExpression.create(expr, targetDataType, SortOrder.getDefault(), expr.getMaxLength(), implementor.getTableRef().getTable().rowKeyOrderOptimizable()); + return CoerceExpression.create(expr, targetDataType, SortOrder.getDefault(), expr.getMaxLength(), implementor.getTableMapping().getPTable().rowKeyOrderOptimizable()); } @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/CorrelateVariableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CorrelateVariableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CorrelateVariableImpl.java new file mode 100644 index 0000000..bde7ddc --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CorrelateVariableImpl.java @@ -0,0 +1,30 @@ +package org.apache.phoenix.calcite; + +import org.apache.phoenix.execute.RuntimeContext.CorrelateVariable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.schema.tuple.Tuple; + +public class CorrelateVariableImpl implements CorrelateVariable { + private final TableMapping tableMapping; + private Tuple value; + + public CorrelateVariableImpl(TableMapping tableMapping) { + this.tableMapping = tableMapping; + } + + @Override + public Expression newExpression(int index) { + return tableMapping.newColumnExpression(index); + } + + @Override + public Tuple getValue() { + return value; + } + + @Override + public void setValue(Tuple value) { + this.value = value; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index 5e26017..683a16b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@ -226,7 +226,7 @@ public class PhoenixSchema implements Schema { try { for (Table table : tables.values()) { if (table instanceof PhoenixTable) { - PTable pTable = ((PhoenixTable) table).pTable; + PTable pTable = ((PhoenixTable) table).tableMapping.getPTable(); for (PTable index : pTable.getIndexes()) { addMaterialization(index, path, calciteSchema); } @@ -247,9 +247,11 @@ public class PhoenixSchema implements Schema { private void addMaterialization(PTable index, List<String> path, CalciteSchema calciteSchema) throws SQLException { index = fixTableMultiTenancy(index); + PhoenixTable table = new PhoenixTable(pc, index); + tables.put(index.getTableName().getString(), table); StringBuffer sb = new StringBuffer(); sb.append("SELECT"); - for (PColumn column : PhoenixTable.getMappedColumns(index)) { + for (PColumn column : table.getColumns()) { String indexColumnName = column.getName().getString(); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); sb.append(",").append(SchemaUtil.getEscapedFullColumnName(dataColumnName)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index 1fcc9d3..7b5d287 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -2,7 +2,6 @@ package org.apache.phoenix.calcite; import java.io.IOException; import java.sql.SQLException; -import java.util.Iterator; import java.util.List; import org.apache.calcite.plan.RelOptTable; @@ -33,15 +32,18 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.iterate.BaseResultIterators; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.parse.ColumnDef; +import org.apache.phoenix.parse.NamedTableNode; +import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.stats.StatisticsUtil; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -52,47 +54,32 @@ import com.google.common.collect.Lists; * Phoenix. */ public class PhoenixTable extends AbstractTable implements TranslatableTable { - public final PTable pTable; - public final List<PColumn> mappedColumns; + public final TableMapping tableMapping; public final ImmutableBitSet pkBitSet; public final RelCollation collation; public final long byteCount; public final long rowCount; public final PhoenixConnection pc; - - public static List<PColumn> getMappedColumns(PTable pTable) { - if (pTable.getBucketNum() == null - && !pTable.isMultiTenant() - && pTable.getViewIndexId() == null) { - return pTable.getColumns(); - } - - List<PColumn> columns = Lists.newArrayList(pTable.getColumns()); - if (pTable.getViewIndexId() != null) { - for (Iterator<PColumn> iter = columns.iterator(); iter.hasNext();) { - if (iter.next().getName().getString().equals(MetaDataUtil.VIEW_INDEX_ID_COLUMN_NAME)) { - iter.remove(); - break; - } - } - } - if (pTable.isMultiTenant()) { - columns.remove(pTable.getBucketNum() == null ? 0 : 1); - } - if (pTable.getBucketNum() != null) { - columns.remove(0); - } - return columns; - } - public PhoenixTable(PhoenixConnection pc, PTable pTable) { + public PhoenixTable(PhoenixConnection pc, PTable pTable) throws SQLException { this.pc = Preconditions.checkNotNull(pc); - this.pTable = Preconditions.checkNotNull(pTable); - this.mappedColumns = getMappedColumns(pTable); + PTable extendedTable = null; + if (pTable.getIndexType() == IndexType.LOCAL) { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create(null, + TableName.create(pTable.getParentSchemaName().getString(), + pTable.getParentTableName().getString()), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + extendedTable = tables.get(0).getTable(); + } + this.tableMapping = extendedTable == null ? new TableMapping(pTable) : new TableMapping(pTable, extendedTable); List<Integer> pkPositions = Lists.<Integer> newArrayList(); List<RelFieldCollation> fieldCollations = Lists.<RelFieldCollation> newArrayList(); - for (int i = 0; i < mappedColumns.size(); i++) { - PColumn column = mappedColumns.get(i); + final List<PColumn> columns = tableMapping.getMappedColumns(); + for (int i = 0; i < columns.size(); i++) { + PColumn column = columns.get(i); if (SchemaUtil.isPKColumn(column)) { SortOrder sortOrder = column.getSortOrder(); pkPositions.add(i); @@ -133,16 +120,17 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { } } - public PTable getTable() { - return pTable; + public List<PColumn> getColumns() { + return tableMapping.getMappedColumns(); } @SuppressWarnings("rawtypes") @Override public RelDataType getRowType(RelDataTypeFactory typeFactory) { final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); - for (int i = 0; i < mappedColumns.size(); i++) { - PColumn pColumn = mappedColumns.get(i); + final List<PColumn> columns = tableMapping.getMappedColumns(); + for (int i = 0; i < columns.size(); i++) { + PColumn pColumn = columns.get(i); final PDataType baseType = pColumn.getDataType().isArrayType() ? PDataType.fromTypeId(pColumn.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE) http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java new file mode 100644 index 0000000..839370d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java @@ -0,0 +1,355 @@ +package org.apache.phoenix.calcite; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.ProjectedColumn; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class TableMapping { + private final TableRef tableRef; + private final List<PColumn> mappedColumns; + private final int extendedColumnsOffset; + private final TableRef extendedTableRef; + + public TableMapping(PTable table) { + this.tableRef = new TableRef(table); + this.mappedColumns = getMappedColumns(table); + this.extendedColumnsOffset = mappedColumns.size(); + this.extendedTableRef = null; + } + + public TableMapping(PTable table, PTable dataTable) throws SQLException { + this.tableRef = new TableRef(table); + this.mappedColumns = Lists.newArrayList(); + this.mappedColumns.addAll(getMappedColumns(table)); + this.extendedColumnsOffset = mappedColumns.size(); + Set<String> names = Sets.newHashSet(); + for (PColumn column : this.mappedColumns) { + names.add(column.getName().getString()); + } + TableRef dataTableRef = new TableRef(dataTable); + List<PColumn> projectedColumns = new ArrayList<PColumn>(); + for (PColumn sourceColumn : dataTable.getColumns()) { + if (!SchemaUtil.isPKColumn(sourceColumn)) { + String colName = IndexUtil.getIndexColumnName(sourceColumn); + if (!names.contains(colName)) { + ColumnRef sourceColumnRef = + new ColumnRef(dataTableRef, sourceColumn.getPosition()); + PColumn column = new ProjectedColumn(PNameFactory.newName(colName), + sourceColumn.getFamilyName(), projectedColumns.size(), + sourceColumn.isNullable(), sourceColumnRef); + projectedColumns.add(column); + } + } + } + this.mappedColumns.addAll(projectedColumns); + PTable extendedTable = PTableImpl.makePTable(dataTable.getTenantId(), + TupleProjectionCompiler.PROJECTED_TABLE_SCHEMA, dataTable.getName(), + PTableType.PROJECTED, null, dataTable.getTimeStamp(), + dataTable.getSequenceNumber(), dataTable.getPKName(), null, + projectedColumns, null, null, Collections.<PTable>emptyList(), + dataTable.isImmutableRows(), Collections.<PName>emptyList(), null, null, + dataTable.isWALDisabled(), false, dataTable.getStoreNulls(), + dataTable.getViewType(), null, null, dataTable.rowKeyOrderOptimizable(), + dataTable.isTransactional(), dataTable.getUpdateCacheFrequency(), + dataTable.getIndexDisableTimestamp()); + this.extendedTableRef = new TableRef(extendedTable); + } + + public TableRef getTableRef() { + return tableRef; + } + + public PTable getPTable() { + return tableRef.getTable(); + } + + public List<PColumn> getMappedColumns() { + return mappedColumns; + } + + public boolean hasExtendedColumns() { + return extendedTableRef != null; + } + + public ColumnExpression newColumnExpression(int index) { + ColumnRef colRef = new ColumnRef( + index < extendedColumnsOffset ? tableRef : extendedTableRef, + this.mappedColumns.get(index).getPosition()); + return colRef.newColumnExpression(); + } + + public ImmutableBitSet getDefaultExtendedColumnRef() { + return ImmutableBitSet.range(extendedColumnsOffset, mappedColumns.size()); + } + + public ImmutableBitSet getExtendedColumnRef(List<RexNode> exprs) { + if (!hasExtendedColumns()) { + return ImmutableBitSet.of(); + } + + ImmutableBitSet.Builder builder = ImmutableBitSet.builder(); + for (RexNode expr : exprs) { + builder.addAll(InputFinder.analyze(expr).inputBitSet.build()); + } + for (int i = 0; i < extendedColumnsOffset; i++) { + builder.clear(i); + } + return builder.build(); + } + + public Pair<Integer, Integer> getExtendedColumnReferenceCount(ImmutableBitSet columnRef) { + Set<String> cf = Sets.newHashSet(); + int columnCount = 0; + for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) { + if (columnRef.get(i)) { + PColumn dataColumn = ((ProjectedColumn) mappedColumns.get(i)) + .getSourceColumnRef().getColumn(); + cf.add(dataColumn.getFamilyName().getString()); + columnCount++; + } + } + return new Pair<Integer, Integer>(cf.size(), columnCount); + } + + public PTable createProjectedTable(boolean retainPKColumns) { + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + List<PColumn> columns = retainPKColumns ? + tableRef.getTable().getColumns() : mappedColumns.subList(0, extendedColumnsOffset); + for (PColumn column : columns) { + sourceColumnRefs.add(new ColumnRef(tableRef, column.getPosition())); + } + if (extendedColumnsOffset < mappedColumns.size()) { + for (PColumn column : mappedColumns.subList(extendedColumnsOffset, mappedColumns.size())) { + sourceColumnRefs.add(new ColumnRef(extendedTableRef, column.getPosition())); + } + } + + try { + return TupleProjectionCompiler.createProjectedTable(tableRef, sourceColumnRefs, retainPKColumns); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public TupleProjector createTupleProjector(boolean retainPKColumns) { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + if (!SchemaUtil.isPKColumn(mappedColumns.get(i)) || !retainPKColumns) { + Expression expr = newColumnExpression(i); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + public RowProjector createRowProjector() { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + Expression expr = newColumnExpression(i); // Do not use column.position() here. + columnProjectors.add(new ExpressionProjector(column.getName().getString(), tableRef.getTable().getName().getString(), expr, false)); + } + // TODO get estimate row size + return new RowProjector(columnProjectors, 0, false); + } + + public void setupScanForExtendedTable(Scan scan, ImmutableBitSet extendedColumnRef, + PhoenixConnection connection) throws SQLException { + if (extendedTableRef == null || extendedColumnRef.isEmpty()) { + return; + } + + TableRef dataTableRef = null; + List<PColumn> dataColumns = Lists.newArrayList(); + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) { + ProjectedColumn column = (ProjectedColumn) mappedColumns.get(i); + builder.addField(column); + if (extendedColumnRef.get(i)) { + dataColumns.add(column.getSourceColumnRef().getColumn()); + exprs.add(column.getSourceColumnRef().newColumnExpression()); + if (dataTableRef == null) { + dataTableRef = column.getSourceColumnRef().getTableRef(); + } + } else { + exprs.add(LiteralExpression.newConstant(null)); + } + } + if (dataColumns.isEmpty()) { + return; + } + + // Set data columns to be join back from data table. + serializeDataTableColumnsToJoin(scan, dataColumns); + // Set tuple projector of the data columns. + TupleProjector projector = new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + TupleProjector.serializeProjectorIntoScan(scan, projector, IndexUtil.INDEX_PROJECTOR); + PTable dataTable = dataTableRef.getTable(); + // Set index maintainer of the local index. + serializeIndexMaintainerIntoScan(scan, dataTable, connection); + // Set view constants if exists. + serializeViewConstantsIntoScan(scan, dataTable); + } + + private static void serializeDataTableColumnsToJoin(Scan scan, List<PColumn> dataColumns) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + DataOutputStream output = new DataOutputStream(stream); + WritableUtils.writeVInt(output, dataColumns.size()); + for (PColumn column : dataColumns) { + Bytes.writeByteArray(output, column.getFamilyName().getBytes()); + Bytes.writeByteArray(output, column.getName().getBytes()); + } + scan.setAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN, stream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private void serializeIndexMaintainerIntoScan(Scan scan, PTable dataTable, PhoenixConnection connection) throws SQLException { + PName name = getPTable().getName(); + List<PTable> indexes = Lists.newArrayListWithExpectedSize(1); + for (PTable index : dataTable.getIndexes()) { + if (index.getName().equals(name) && index.getIndexType() == IndexType.LOCAL) { + indexes.add(index); + break; + } + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + IndexMaintainer.serialize(dataTable, ptr, indexes, connection); + scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + if (dataTable.isTransactional()) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction()); + } + } + + private static void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) { + int dataPosOffset = (dataTable.getBucketNum() != null ? 1 : 0) + (dataTable.isMultiTenant() ? 1 : 0); + int nViewConstants = 0; + if (dataTable.getType() == PTableType.VIEW) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + List<PColumn> dataPkColumns = dataTable.getPKColumns(); + for (int i = dataPosOffset; i < dataPkColumns.size(); i++) { + PColumn dataPKColumn = dataPkColumns.get(i); + if (dataPKColumn.getViewConstant() != null) { + nViewConstants++; + } + } + if (nViewConstants > 0) { + byte[][] viewConstants = new byte[nViewConstants][]; + int j = 0; + for (int i = dataPosOffset; i < dataPkColumns.size(); i++) { + PColumn dataPkColumn = dataPkColumns.get(i); + if (dataPkColumn.getViewConstant() != null) { + if (IndexUtil.getViewConstantValue(dataPkColumn, ptr)) { + viewConstants[j++] = ByteUtil.copyKeyBytesIfNecessary(ptr); + } else { + throw new IllegalStateException(); + } + } + } + serializeViewConstantsIntoScan(viewConstants, scan); + } + } + } + + private static void serializeViewConstantsIntoScan(byte[][] viewConstants, Scan scan) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + try { + DataOutputStream output = new DataOutputStream(stream); + WritableUtils.writeVInt(output, viewConstants.length); + for (byte[] viewConstant : viewConstants) { + Bytes.writeByteArray(output, viewConstant); + } + scan.setAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS, stream.toByteArray()); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + stream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private static List<PColumn> getMappedColumns(PTable pTable) { + if (pTable.getBucketNum() == null + && !pTable.isMultiTenant() + && pTable.getViewIndexId() == null) { + return pTable.getColumns(); + } + + List<PColumn> columns = Lists.newArrayList(pTable.getColumns()); + if (pTable.getViewIndexId() != null) { + for (Iterator<PColumn> iter = columns.iterator(); iter.hasNext();) { + if (iter.next().getName().getString().equals(MetaDataUtil.VIEW_INDEX_ID_COLUMN_NAME)) { + iter.remove(); + break; + } + } + } + if (pTable.isMultiTenant()) { + columns.remove(pTable.getBucketNum() == null ? 0 : 1); + } + if (pTable.getBucketNum() != null) { + columns.remove(0); + } + + return columns; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java index ae09b42..7863523 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java @@ -38,6 +38,7 @@ import org.apache.phoenix.calcite.rules.PhoenixMergeSortUnionRule; import org.apache.phoenix.calcite.rules.PhoenixOrderedAggregateRule; import org.apache.phoenix.calcite.rules.PhoenixReverseTableScanRule; import org.apache.phoenix.calcite.rules.PhoenixSortServerJoinTransposeRule; +import org.apache.phoenix.calcite.rules.PhoenixTableScanColumnRefRule; import com.google.common.base.Function; @@ -93,6 +94,7 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl { planner.addRule(rule); } planner.addRule(PhoenixFilterScanMergeRule.INSTANCE); + planner.addRule(PhoenixTableScanColumnRefRule.INSTANCE); planner.addRule(PhoenixCompactClientSortRule.INSTANCE); planner.addRule(PhoenixJoinSingleValueAggregateMergeRule.INSTANCE); planner.addRule(PhoenixMergeSortUnionRule.INSTANCE); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java index 16c97cc..dff5d4e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java @@ -18,6 +18,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.StatementContext; @@ -32,7 +33,6 @@ import org.apache.phoenix.expression.function.AggregateFunction; import org.apache.phoenix.expression.function.SingleAggregateFunction; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.RowKeyValueAccessor; -import org.apache.phoenix.schema.TableRef; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -193,8 +193,8 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe } TupleProjector tupleProjector = implementor.project(exprs); - PTable projectedTable = implementor.createProjectedTable(); - implementor.setTableRef(new TableRef(projectedTable)); + PTable projectedTable = implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns); + implementor.setTableMapping(new TableMapping(projectedTable)); return new TupleProjectionPlan(plan, tupleProjector, null); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java index 464109d..6e09fa5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java @@ -15,10 +15,10 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import com.google.common.collect.Lists; @@ -55,8 +55,8 @@ abstract public class PhoenixAbstractProject extends Project implements PhoenixR exprs.add(CalciteUtils.toExpression(project, implementor)); } TupleProjector tupleProjector = implementor.project(exprs); - PTable projectedTable = implementor.createProjectedTable(); - implementor.setTableRef(new TableRef(projectedTable)); + PTable projectedTable = implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns); + implementor.setTableMapping(new TableMapping(projectedTable)); return tupleProjector; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java index 5aaff2f..680e871 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java @@ -63,7 +63,7 @@ public class PhoenixClientAggregate extends PhoenixAbstractAggregate { QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); implementor.popContext(); - TableRef tableRef = implementor.getTableRef(); + TableRef tableRef = implementor.getTableMapping().getTableRef(); PhoenixStatement stmt = plan.getContext().getStatement(); StatementContext context; try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java index 2df5f74..53c0cf4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java @@ -19,6 +19,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; @@ -115,12 +116,12 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin { implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns && getJoinType() != JoinRelType.FULL, true, getColumnRefList(0))); QueryPlan leftPlan = implementInput(implementor, 0, leftExprs); - PTable leftTable = implementor.getTableRef().getTable(); + PTable leftTable = implementor.getTableMapping().getPTable(); implementor.popContext(); implementor.pushContext(new ImplementorContext(false, true, getColumnRefList(1))); QueryPlan rightPlan = implementInput(implementor, 1, rightExprs); - PTable rightTable = implementor.getTableRef().getTable(); + PTable rightTable = implementor.getTableMapping().getPTable(); implementor.popContext(); JoinType type = CalciteUtils.convertJoinType(getJoinType()); @@ -130,8 +131,9 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin { } catch (SQLException e) { throw new RuntimeException(e); } - TableRef tableRef = new TableRef(joinedTable); - implementor.setTableRef(tableRef); + TableMapping tableMapping = new TableMapping(joinedTable); + implementor.setTableMapping(tableMapping); + TableRef tableRef = tableMapping.getTableRef(); ColumnResolver resolver; try { resolver = FromCompiler.getResolver(tableRef); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java index 31dfc3d..cec1181 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java @@ -16,6 +16,7 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableIntList; import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.QueryPlan; @@ -99,7 +100,8 @@ public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin implements implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns && getJoinType() != JoinRelType.FULL, true, getColumnRefList(0))); QueryPlan leftPlan = implementInput(implementor, 0, leftExprs); - TableRef joinedTable = implementor.getTableRef(); + TableMapping tableMapping = implementor.getTableMapping(); + TableRef joinedTable = tableMapping.getTableRef(); implementor.popContext(); implementor.pushContext(new ImplementorContext(false, true, getColumnRefList(1))); @@ -107,7 +109,7 @@ public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin implements implementor.popContext(); JoinType type = JoinType.Semi; - implementor.setTableRef(joinedTable); + implementor.setTableMapping(tableMapping); PhoenixStatement stmt = leftPlan.getContext().getStatement(); ColumnResolver resolver = leftPlan.getContext().getResolver(); StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java index 59fb639..7c73530 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java @@ -59,7 +59,7 @@ public class PhoenixClientSort extends PhoenixAbstractSort { QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); - TableRef tableRef = implementor.getTableRef(); + TableRef tableRef = implementor.getTableMapping().getTableRef(); PhoenixStatement stmt = plan.getContext().getStatement(); StatementContext context; try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java index 496dfdb..d70c5be 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java @@ -17,13 +17,14 @@ import org.apache.calcite.sql.SemiJoinType; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.CorrelateVariableImpl; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; import org.apache.phoenix.compile.JoinCompiler; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.CorrelatePlan; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import com.google.common.base.Supplier; @@ -73,14 +74,14 @@ public class PhoenixCorrelate extends Correlate implements PhoenixRel { public QueryPlan implement(Implementor implementor) { implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns, true, ImmutableIntList.identity(getLeft().getRowType().getFieldCount()))); QueryPlan leftPlan = implementor.visitInput(0, (PhoenixRel) getLeft()); - PTable leftTable = implementor.getTableRef().getTable(); + PTable leftTable = implementor.getTableMapping().getPTable(); implementor.popContext(); - implementor.getRuntimeContext().defineCorrelateVariable(getCorrelVariable(), implementor.getTableRef()); + implementor.getRuntimeContext().defineCorrelateVariable(getCorrelVariable(), new CorrelateVariableImpl(implementor.getTableMapping())); implementor.pushContext(new ImplementorContext(false, true, ImmutableIntList.identity(getRight().getRowType().getFieldCount()))); QueryPlan rightPlan = implementor.visitInput(1, (PhoenixRel) getRight()); - PTable rightTable = implementor.getTableRef().getTable(); + PTable rightTable = implementor.getTableMapping().getPTable(); implementor.popContext(); JoinType type = CalciteUtils.convertSemiJoinType(getJoinType()); @@ -90,8 +91,8 @@ public class PhoenixCorrelate extends Correlate implements PhoenixRel { } catch (SQLException e) { throw new RuntimeException(e); } - TableRef tableRef = new TableRef(joinedTable); - implementor.setTableRef(tableRef); + TableMapping tableMapping = new TableMapping(joinedTable); + implementor.setTableMapping(tableMapping); return new CorrelatePlan(leftPlan, rightPlan, getCorrelVariable(), type, false, implementor.getRuntimeContext(), joinedTable, http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java index 3edcf60..236af84 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java @@ -89,7 +89,7 @@ public class PhoenixLimit extends SingleRel implements PhoenixRel { } return new ClientScanPlan(plan.getContext(), plan.getStatement(), - implementor.getTableRef(), RowProjector.EMPTY_PROJECTOR, + implementor.getTableMapping().getTableRef(), RowProjector.EMPTY_PROJECTOR, fetchValue, null, OrderBy.EMPTY_ORDER_BY, plan); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java index f6b0e97..5d8b7b0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java @@ -6,9 +6,9 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.util.ImmutableIntList; import org.apache.phoenix.calcite.PhoenixSequence; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.SequenceValueExpression; import org.apache.phoenix.execute.RuntimeContext; @@ -16,8 +16,6 @@ import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.parse.SequenceValueParseNode; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; /** @@ -75,15 +73,12 @@ public interface PhoenixRel extends RelNode { Expression newFieldAccessExpression(String variableId, int index, PDataType type); SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op); RuntimeContext getRuntimeContext(); - void setTableRef(TableRef tableRef); - TableRef getTableRef(); + void setTableMapping(TableMapping tableMapping); + TableMapping getTableMapping(); void setSequenceManager(SequenceManager sequenceManager); void pushContext(ImplementorContext context); ImplementorContext popContext(); ImplementorContext getCurrentContext(); - PTable createProjectedTable(); - TupleProjector createTupleProjector(); - RowProjector createRowProjector(); TupleProjector project(List<Expression> exprs); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index a33cd18..759f09b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@ -5,18 +5,12 @@ import java.util.Collections; import java.util.List; import java.util.Stack; -import org.apache.hadoop.hbase.HConstants; -import org.apache.phoenix.calcite.CalciteUtils; import org.apache.phoenix.calcite.PhoenixSequence; -import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext; -import org.apache.phoenix.compile.ColumnProjector; -import org.apache.phoenix.compile.ExpressionProjector; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.SequenceManager; import org.apache.phoenix.compile.SequenceValueExpression; -import org.apache.phoenix.compile.TupleProjectionCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.execute.RuntimeContext; import org.apache.phoenix.execute.TupleProjector; @@ -26,7 +20,6 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SequenceValueParseNode; import org.apache.phoenix.parse.TableName; -import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; @@ -35,19 +28,14 @@ import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.util.SchemaUtil; - import com.google.common.collect.Lists; public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { private final RuntimeContext runtimeContext; - private TableRef tableRef; - private List<PColumn> mappedColumns; private Stack<ImplementorContext> contextStack; private SequenceManager sequenceManager; + private TableMapping tableMapping; public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) { this.runtimeContext = runtimeContext; @@ -61,14 +49,13 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { @Override public ColumnExpression newColumnExpression(int index) { - ColumnRef colRef = new ColumnRef(this.tableRef, this.mappedColumns.get(index).getPosition()); - return colRef.newColumnExpression(); + return tableMapping.newColumnExpression(index); } @SuppressWarnings("rawtypes") @Override public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { - Expression fieldAccessExpr = runtimeContext.newCorrelateVariableReference(variableId, index); + Expression fieldAccessExpr = runtimeContext.getCorrelateVariable(variableId).newExpression(index); return new CorrelateVariableFieldAccessExpression(runtimeContext, variableId, fieldAccessExpr); } @@ -89,14 +76,13 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { } @Override - public void setTableRef(TableRef tableRef) { - this.tableRef = tableRef; - this.mappedColumns = PhoenixTable.getMappedColumns(tableRef.getTable()); + public void setTableMapping(TableMapping tableMapping) { + this.tableMapping = tableMapping; } @Override - public TableRef getTableRef() { - return this.tableRef; + public TableMapping getTableMapping() { + return this.tableMapping; } @Override @@ -120,49 +106,6 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { } @Override - public PTable createProjectedTable() { - List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); - List<PColumn> columns = getCurrentContext().retainPKColumns ? - getTableRef().getTable().getColumns() : mappedColumns; - for (PColumn column : columns) { - sourceColumnRefs.add(new ColumnRef(getTableRef(), column.getPosition())); - } - - try { - return TupleProjectionCompiler.createProjectedTable(getTableRef(), sourceColumnRefs, getCurrentContext().retainPKColumns); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - @Override - public TupleProjector createTupleProjector() { - KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); - List<Expression> exprs = Lists.<Expression> newArrayList(); - for (PColumn column : mappedColumns) { - if (!SchemaUtil.isPKColumn(column) || !getCurrentContext().retainPKColumns) { - Expression expr = new ColumnRef(tableRef, column.getPosition()).newColumnExpression(); - exprs.add(expr); - builder.addField(expr); - } - } - - return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); - } - - @Override - public RowProjector createRowProjector() { - List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); - for (int i = 0; i < mappedColumns.size(); i++) { - PColumn column = mappedColumns.get(i); - Expression expr = newColumnExpression(i); // Do not use column.position() here. - columnProjectors.add(new ExpressionProjector(column.getName().getString(), getTableRef().getTable().getName().getString(), expr, false)); - } - // TODO get estimate row size - return new RowProjector(columnProjectors, 0, false); - } - - @Override public TupleProjector project(List<Expression> exprs) { KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); List<PColumn> columns = Lists.<PColumn>newArrayList(); @@ -180,7 +123,7 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { null, null, columns, null, null, Collections.<PTable>emptyList(), false, Collections.<PName>emptyList(), null, null, false, false, false, null, null, null, true, false, 0, 0); - this.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false)); + this.setTableMapping(new TableMapping(pTable)); } catch (SQLException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java index 2f09a9d..8690377 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java @@ -17,6 +17,7 @@ import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; import org.apache.phoenix.compile.JoinCompiler; import org.apache.phoenix.compile.QueryPlan; @@ -28,7 +29,6 @@ import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -111,12 +111,12 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin { implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns, true, getColumnRefList(0))); QueryPlan leftPlan = implementInput(implementor, 0, null); - PTable leftTable = implementor.getTableRef().getTable(); + PTable leftTable = implementor.getTableMapping().getPTable(); implementor.popContext(); implementor.pushContext(new ImplementorContext(false, true, getColumnRefList(1))); QueryPlan rightPlan = implementInput(implementor, 1, rightExprs); - PTable rightTable = implementor.getTableRef().getTable(); + PTable rightTable = implementor.getTableMapping().getPTable(); implementor.popContext(); JoinType type = CalciteUtils.convertJoinType(getJoinType()); @@ -126,7 +126,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin { } catch (SQLException e) { throw new RuntimeException(e); } - implementor.setTableRef(new TableRef(joinedTable)); + implementor.setTableMapping(new TableMapping(joinedTable)); // Compile left conditions against the joined table due to implementation of HashJoinRegionScanner. for (Iterator<Integer> iter = joinInfo.leftKeys.iterator(); iter.hasNext();) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java index 74c3e4d..2b133aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java @@ -15,6 +15,7 @@ import org.apache.calcite.rel.core.SemiJoin; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableIntList; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.HashJoinPlan; @@ -24,7 +25,6 @@ import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.JoinTableNode.JoinType; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -98,7 +98,7 @@ public class PhoenixServerSemiJoin extends PhoenixAbstractSemiJoin { implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns, true, getColumnRefList(0))); QueryPlan leftPlan = implementInput(implementor, 0, leftExprs); - TableRef joinedTable = implementor.getTableRef(); + TableMapping tableMapping = implementor.getTableMapping(); implementor.popContext(); implementor.pushContext(new ImplementorContext(false, true, getColumnRefList(1))); @@ -106,10 +106,10 @@ public class PhoenixServerSemiJoin extends PhoenixAbstractSemiJoin { implementor.popContext(); JoinType type = JoinType.Semi; - implementor.setTableRef(joinedTable); + implementor.setTableMapping(tableMapping); @SuppressWarnings("unchecked") HashJoinInfo hashJoinInfo = new HashJoinInfo( - joinedTable.getTable(), + tableMapping.getPTable(), new ImmutableBytesPtr[] {new ImmutableBytesPtr()}, (List<Expression>[]) (new List[] {leftExprs}), new JoinType[] {type}, new boolean[] {true}, http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index 32c7a90..54b1f27 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@ -18,10 +18,11 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.ImmutableIntList; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.calcite.CalciteUtils; import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; @@ -45,7 +46,6 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; @@ -64,17 +64,18 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { public final RexNode filter; public final ScanOrder scanOrder; public final ScanRanges scanRanges; + public final ImmutableBitSet extendedColumnRef; protected final Long estimatedBytes; protected final float rowCountFactor; public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table) { return create(cluster, table, null, - getDefaultScanOrder(table.unwrap(PhoenixTable.class))); + getDefaultScanOrder(table.unwrap(PhoenixTable.class)), null); } public static PhoenixTableScan create(RelOptCluster cluster, final RelOptTable table, - RexNode filter, final ScanOrder scanOrder) { + RexNode filter, final ScanOrder scanOrder, ImmutableBitSet extendedColumnRef) { final RelTraitSet traits = cluster.traitSetOf(PhoenixConvention.SERVER) .replaceIfs(RelCollationTraitDef.INSTANCE, @@ -87,25 +88,34 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { return scanOrder == ScanOrder.FORWARD ? collations : reverse(collations); } }); - return new PhoenixTableScan(cluster, traits, table, filter, scanOrder); + return new PhoenixTableScan(cluster, traits, table, filter, scanOrder, extendedColumnRef); } - private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, RelOptTable table, RexNode filter, ScanOrder scanOrder) { + private PhoenixTableScan(RelOptCluster cluster, RelTraitSet traits, + RelOptTable table, RexNode filter, ScanOrder scanOrder, + ImmutableBitSet extendedColumnRef) { super(cluster, traits, table); this.filter = filter; this.scanOrder = scanOrder; final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); this.rowCountFactor = phoenixTable.pc.getQueryServices() - .getProps().getFloat(PhoenixRel.ROW_COUNT_FACTOR, 1f); + .getProps().getFloat(PhoenixRel.ROW_COUNT_FACTOR, 1f); try { // TODO simplify this code - PTable pTable = phoenixTable.getTable(); - TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); + TableMapping tableMapping = phoenixTable.tableMapping; + PTable pTable = tableMapping.getPTable(); SelectStatement select = SelectStatement.SELECT_ONE; PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); - ColumnResolver resolver = FromCompiler.getResolver(tableRef); + ColumnResolver resolver = FromCompiler.getResolver(tableMapping.getTableRef()); StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); - if (filter != null) { + if (extendedColumnRef == null) { + extendedColumnRef = tableMapping.getDefaultExtendedColumnRef(); + } + if (filter == null) { + this.extendedColumnRef = extendedColumnRef; + } else { + this.extendedColumnRef = extendedColumnRef.union( + tableMapping.getExtendedColumnRef(ImmutableList.of(filter))); // We use a implementor with a special implementation for field access // here, which translates RexFieldAccess into a LiteralExpression // with a sample value. This will achieve 3 goals at a time: @@ -124,12 +134,13 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { } } }; - tmpImplementor.setTableRef(tableRef); + tmpImplementor.setTableMapping(tableMapping); Expression filterExpr = CalciteUtils.toExpression(filter, tmpImplementor); filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr); WhereCompiler.setScanFilter(context, select, filterExpr, true, false); } this.scanRanges = context.getScanRanges(); + // TODO Get estimated byte count based on column reference list. this.estimatedBytes = BaseResultIterators.getEstimatedCount(context, pTable).getSecond(); } catch (SQLException e) { throw new RuntimeException(e); @@ -170,7 +181,8 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { public RelWriter explainTerms(RelWriter pw) { return super.explainTerms(pw) .itemIf("filter", filter, filter != null) - .itemIf("scanOrder", scanOrder, scanOrder != ScanOrder.NONE); + .itemIf("scanOrder", scanOrder, scanOrder != ScanOrder.NONE) + .itemIf("extendedColumns", extendedColumnRef, !extendedColumnRef.isEmpty()); } @Override @@ -188,6 +200,10 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { byteCount = phoenixTable.byteCount; } } + Pair<Integer, Integer> columnRefCount = + phoenixTable.tableMapping.getExtendedColumnReferenceCount(extendedColumnRef); + double extendedColumnMultiplier = 1 + columnRefCount.getFirst() * 10 + columnRefCount.getSecond() * 0.1; + byteCount *= extendedColumnMultiplier; byteCount *= rowCountFactor; if (scanOrder != ScanOrder.NONE) { // We don't want to make a big difference here. The idea is to avoid @@ -230,12 +246,11 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { @Override public QueryPlan implement(Implementor implementor) { final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class); - PTable pTable = phoenixTable.getTable(); - TableRef tableRef = new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false); - implementor.setTableRef(tableRef); + TableMapping tableMapping = phoenixTable.tableMapping; + implementor.setTableMapping(tableMapping); try { PhoenixStatement stmt = new PhoenixStatement(phoenixTable.pc); - ColumnResolver resolver = FromCompiler.getResolver(tableRef); + ColumnResolver resolver = FromCompiler.getResolver(tableMapping.getTableRef()); StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); SelectStatement select = SelectStatement.SELECT_ONE; ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList; @@ -255,12 +270,14 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { if (filter != null && !context.getScanRanges().equals(this.scanRanges)) { dynamicFilter = filterExpr; } - projectColumnFamilies(context.getScan(), phoenixTable.mappedColumns, columnRefList); + tableMapping.setupScanForExtendedTable(context.getScan(), extendedColumnRef, context.getConnection()); + projectColumnFamilies(context.getScan(), tableMapping.getMappedColumns(), columnRefList); if (implementor.getCurrentContext().forceProject) { - TupleProjector tupleProjector = implementor.createTupleProjector(); + boolean retainPKColumns = implementor.getCurrentContext().retainPKColumns; + TupleProjector tupleProjector = tableMapping.createTupleProjector(retainPKColumns); TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); - PTable projectedTable = implementor.createProjectedTable(); - implementor.setTableRef(new TableRef(projectedTable)); + PTable projectedTable = tableMapping.createProjectedTable(retainPKColumns); + implementor.setTableMapping(new TableMapping(projectedTable)); } Integer limit = null; OrderBy orderBy = scanOrder == ScanOrder.NONE ? @@ -269,7 +286,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { OrderBy.FWD_ROW_KEY_ORDER_BY : OrderBy.REV_ROW_KEY_ORDER_BY); ParallelIteratorFactory iteratorFactory = null; - return new ScanPlan(context, select, tableRef, RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, dynamicFilter); + return new ScanPlan(context, select, tableMapping.getTableRef(), RowProjector.EMPTY_PROJECTOR, limit, orderBy, iteratorFactory, true, dynamicFilter); } catch (SQLException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java index 3d0aa22..677baa5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java @@ -104,7 +104,7 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume } @Override public RowProjector getProjector() { - return phoenixImplementor.createRowProjector(); + return phoenixImplementor.getTableMapping().createRowProjector(); } @Override public ResultIterator iterator(ParallelScanGrouper scanGrouper) http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java index 0d64868..3076890 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java @@ -10,12 +10,12 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Uncollect; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.execute.UnnestArrayPlan; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; public class PhoenixUncollect extends Uncollect implements PhoenixRel { @@ -56,8 +56,8 @@ public class PhoenixUncollect extends Uncollect implements PhoenixRel { } catch (SQLException e) { throw new RuntimeException(e); } - PTable projectedTable = implementor.createProjectedTable(); - implementor.setTableRef(new TableRef(projectedTable)); + PTable projectedTable = implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns); + implementor.setTableMapping(new TableMapping(projectedTable)); return new UnnestArrayPlan(plan, arrayExpression, false); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java index 9af3f52..83947ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java @@ -28,6 +28,7 @@ import org.apache.calcite.rex.RexLiteral; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.TableMapping; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; @@ -119,12 +120,13 @@ public class PhoenixValues extends Values implements PhoenixRel { TupleProjector projector = implementor.project(exprs); literalResult.add(projector.projectResults(baseTuple)); } - PTable projectedTable = implementor.createProjectedTable(); - implementor.setTableRef(new TableRef(projectedTable)); + PTable projectedTable = implementor.getTableMapping().createProjectedTable(implementor.getCurrentContext().retainPKColumns); + TableMapping tableMapping = new TableMapping(projectedTable); + implementor.setTableMapping(tableMapping); try { PhoenixStatement stmt = new PhoenixStatement(phoenixConnection); - ColumnResolver resolver = FromCompiler.getResolver(implementor.getTableRef()); + ColumnResolver resolver = FromCompiler.getResolver(tableMapping.getTableRef()); StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt)); return new LiteralResultIterationPlan(literalResult, context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null); } catch (SQLException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java index a6f3a59..e932fad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixFilterScanMergeRule.java @@ -44,6 +44,7 @@ public class PhoenixFilterScanMergeRule extends RelOptRule { PhoenixTableScan scan = call.rel(1); assert scan.filter == null : "predicate should have ensured no filter"; call.transformTo(PhoenixTableScan.create( - scan.getCluster(), scan.getTable(), filter.getCondition(), scan.scanOrder)); + scan.getCluster(), scan.getTable(), filter.getCondition(), + scan.scanOrder, scan.extendedColumnRef)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java index b38324c..b369e48 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixForwardTableScanRule.java @@ -43,7 +43,8 @@ public class PhoenixForwardTableScanRule extends RelOptRule { for (RelCollation candidate : scan.getTable().getCollationList()) { if (candidate.satisfies(collation)) { RelNode newRel = PhoenixTableScan.create( - scan.getCluster(), scan.getTable(), scan.filter, ScanOrder.FORWARD); + scan.getCluster(), scan.getTable(), scan.filter, + ScanOrder.FORWARD, scan.extendedColumnRef); if (sort.offset != null || sort.fetch != null) { newRel = sort.copy( sort.getTraitSet().replace(RelCollations.EMPTY), http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java index 0a6f000..715df8b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixReverseTableScanRule.java @@ -47,7 +47,8 @@ public class PhoenixReverseTableScanRule extends RelOptRule { for (RelCollation candidate : scan.getTable().getCollationList()) { if (CalciteUtils.reverseCollation(candidate).satisfies(collation)) { RelNode newRel = PhoenixTableScan.create( - scan.getCluster(), scan.getTable(), scan.filter, ScanOrder.REVERSE); + scan.getCluster(), scan.getTable(), scan.filter, + ScanOrder.REVERSE, scan.extendedColumnRef); if (sort.offset != null || sort.fetch != null) { newRel = sort.copy( sort.getTraitSet().replace(RelCollations.EMPTY), http://git-wip-us.apache.org/repos/asf/phoenix/blob/aa0bf583/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixTableScanColumnRefRule.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixTableScanColumnRefRule.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixTableScanColumnRefRule.java new file mode 100644 index 0000000..9076593 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixTableScanColumnRefRule.java @@ -0,0 +1,50 @@ +package org.apache.phoenix.calcite.rules; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.calcite.rel.PhoenixTableScan; +import com.google.common.base.Predicate; + +public class PhoenixTableScanColumnRefRule extends RelOptRule { + + /** Predicate that returns true if a table scan has extended columns. */ + private static final Predicate<PhoenixTableScan> APPLICABLE_TABLE_SCAN = + new Predicate<PhoenixTableScan>() { + @Override + public boolean apply(PhoenixTableScan phoenixTableScan) { + return phoenixTableScan.getTable() + .unwrap(PhoenixTable.class).tableMapping.hasExtendedColumns(); + } + }; + + public static final PhoenixTableScanColumnRefRule INSTANCE = new PhoenixTableScanColumnRefRule(); + + private PhoenixTableScanColumnRefRule() { + super( + operand(Project.class, + operand(PhoenixTableScan.class, null, APPLICABLE_TABLE_SCAN, any()))); + } + + @Override + public void onMatch(RelOptRuleCall call) { + Project project = call.rel(0); + PhoenixTableScan scan = call.rel(1); + ImmutableBitSet bitSet = scan.getTable().unwrap(PhoenixTable.class) + .tableMapping.getExtendedColumnRef(project.getProjects()); + if (bitSet.contains(scan.extendedColumnRef)) { + return; + } + + call.transformTo( + project.copy( + project.getTraitSet(), + PhoenixTableScan.create( + scan.getCluster(), scan.getTable(), + scan.filter, scan.scanOrder, bitSet), + project.getProjects(), + project.getRowType())); + } +}