Repository: phoenix Updated Branches: refs/heads/calcite 3b70c6b98 -> 7b2296f15
PHOENIX-2679 Implement column family schema structure in Calcite-Phoenix Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0d3ff585 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0d3ff585 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0d3ff585 Branch: refs/heads/calcite Commit: 0d3ff585842f85b7b5a2aa90801faac82bfd1a82 Parents: 3b70c6b Author: maryannxue <maryann....@gmail.com> Authored: Mon Dec 19 21:44:54 2016 -0800 Committer: maryannxue <maryann....@gmail.com> Committed: Mon Dec 19 21:44:54 2016 -0800 ---------------------------------------------------------------------- .../apache/phoenix/calcite/PhoenixTable.java | 13 +- .../apache/phoenix/calcite/TableMapping.java | 151 ++++++++++++++++++- 2 files changed, 157 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/0d3ff585/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index fe8d6de..b29969f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -15,6 +15,8 @@ import org.apache.calcite.rel.RelFieldCollation.Direction; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.schema.CustomColumnResolvingTable; import org.apache.calcite.schema.Statistic; import org.apache.calcite.schema.TranslatableTable; import org.apache.calcite.schema.impl.AbstractTable; @@ -51,7 +53,8 @@ import com.google.common.collect.Lists; * Implementation of Calcite {@link org.apache.calcite.schema.Table} SPI for * Phoenix. */ -public class PhoenixTable extends AbstractTable implements TranslatableTable { +public class PhoenixTable extends AbstractTable + implements TranslatableTable, CustomColumnResolvingTable { public final TableMapping tableMapping; public final ImmutableBitSet pkBitSet; public final RelCollation collation; @@ -131,7 +134,7 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { RelDataType type = CalciteUtils.pDataTypeToRelDataType( typeFactory, pColumn.getDataType(), pColumn.getMaxLength(), pColumn.getScale(), pColumn.getArraySize()); - builder.add(pColumn.getName().getString(), type); + builder.add(tableMapping.getColumnNames().get(i), type); builder.nullable(pColumn.isNullable()); } return builder.build(); @@ -166,4 +169,10 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { } }; } + + @Override + public List<org.apache.calcite.util.Pair<RelDataTypeField, List<String>>> resolveColumn( + RelDataType rowType, RelDataTypeFactory typeFactory, List<String> names) { + return tableMapping.resolveColumn(rowType, typeFactory, names); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0d3ff585/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java index e27843c..161c618 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/TableMapping.java @@ -4,18 +4,28 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.sql.SQLException; +import java.util.AbstractList; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.Map.Entry; import org.apache.calcite.plan.RelOptUtil.InputFinder; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.StructKind; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Pair; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ExpressionProjector; @@ -27,6 +37,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; @@ -44,6 +55,7 @@ import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class TableMapping { @@ -52,6 +64,9 @@ public class TableMapping { private final List<PColumn> mappedColumns; private final int extendedColumnsOffset; private final TableRef extendedTableRef; + // For column resolving + private final List<String> names = Lists.newArrayList(); + private final Map<String, Map<String, Integer>> groupMap = Maps.newHashMap(); public TableMapping(PTable table) { this.tableRef = new TableRef(table); @@ -59,6 +74,7 @@ public class TableMapping { this.mappedColumns = getMappedColumns(table); this.extendedColumnsOffset = mappedColumns.size(); this.extendedTableRef = null; + init(); } public TableMapping(TableRef tableRef, TableRef dataTableRef, boolean extend) throws SQLException { @@ -88,6 +104,22 @@ public class TableMapping { dataTable.getAutoPartitionSeqName(), dataTable.isAppendOnlySchema()); this.extendedTableRef = new TableRef(extendedTable); } + init(); + } + + private void init() { + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + String familyName = column.getFamilyName() == null ? "" : column.getFamilyName().getString(); + String translatedName = SchemaUtil.getCaseSensitiveColumnDisplayName(familyName, column.getName().getString()); + names.add(translatedName); + Map<String, Integer> subMap = groupMap.get(familyName); + if (subMap == null) { + subMap = Maps.newHashMap(); + groupMap.put(familyName, subMap); + } + subMap.put(column.getName().getString(), i); + } } public TableRef getTableRef() { @@ -101,7 +133,11 @@ public class TableMapping { public TableRef getDataTableRef() { return dataTableRef; } - + + public List<String> getColumnNames() { + return names; + } + public List<PColumn> getMappedColumns() { return mappedColumns; } @@ -109,6 +145,111 @@ public class TableMapping { public boolean hasExtendedColumns() { return extendedTableRef != null; } + + public List<Pair<RelDataTypeField, List<String>>> resolveColumn( + RelDataType rowType, RelDataTypeFactory typeFactory, List<String> names) { + List<Pair<RelDataTypeField, List<String>>> ret = new ArrayList<>(); + if (names.size() >= 2) { + Map<String, Integer> subMap = groupMap.get(names.get(0)); + if (subMap != null) { + Integer index = subMap.get(names.get(1)); + if (index != null) { + ret.add( + new Pair<RelDataTypeField, List<String>>( + rowType.getFieldList().get(index), + names.subList(2, names.size()))); + } + } + } + + final String columnName = names.get(0); + final List<String> remainder = names.subList(1, names.size()); + for (int i = 0; i < this.names.size(); i++) { + if (columnName.equals(this.names.get(i))) { + ret.add( + new Pair<RelDataTypeField, List<String>>( + rowType.getFieldList().get(i), remainder)); + return ret; + } + } + + final List<String> priorityGroups = Arrays.asList("", QueryConstants.DEFAULT_COLUMN_FAMILY); + for (String group : priorityGroups) { + Map<String, Integer> subMap = groupMap.get(group); + if (subMap != null) { + Integer index = subMap.get(columnName); + if (index != null) { + ret.add( + new Pair<RelDataTypeField, List<String>>( + rowType.getFieldList().get(index), remainder)); + return ret; + } + } + } + for (Map.Entry<String, Map<String, Integer>> entry : groupMap.entrySet()) { + if (priorityGroups.contains(entry.getKey())) { + continue; + } + Integer index = entry.getValue().get(columnName); + if (index != null) { + ret.add( + new Pair<RelDataTypeField, List<String>>( + rowType.getFieldList().get(index), remainder)); + } + } + + if (ret.isEmpty() && names.size() == 1) { + Map<String, Integer> subMap = groupMap.get(columnName); + if (subMap != null) { + List<Map.Entry<String, Integer>> entries = + new ArrayList<>(subMap.entrySet()); + Collections.sort( + entries, + new Comparator<Map.Entry<String, Integer>>() { + @Override public int compare( + Entry<String, Integer> o1, Entry<String, Integer> o2) { + return o1.getValue() - o2.getValue(); + } + }); + ret.add( + new Pair<RelDataTypeField, List<String>>( + new RelDataTypeFieldImpl( + columnName, -1, + createStructType( + rowType, + typeFactory, + entries)), + remainder)); + } + } + + return ret; + } + + private static RelDataType createStructType( + final RelDataType rowType, + RelDataTypeFactory typeFactory, + final List<Map.Entry<String, Integer>> entries) { + return typeFactory.createStructType( + StructKind.PEEK_FIELDS, + new AbstractList<RelDataType>() { + @Override public RelDataType get(int index) { + final int i = entries.get(index).getValue(); + return rowType.getFieldList().get(i).getType(); + } + @Override public int size() { + return entries.size(); + } + }, + new AbstractList<String>() { + @Override public String get(int index) { + return entries.get(index).getKey(); + } + @Override public int size() { + return entries.size(); + } + }); + } public Expression newColumnExpression(int index) { ColumnRef colRef = new ColumnRef( @@ -139,8 +280,8 @@ public class TableMapping { } return builder.build(); } - - public Pair<Integer, Integer> getExtendedColumnReferenceCount(ImmutableBitSet columnRef) { + + public org.apache.hadoop.hbase.util.Pair<Integer, Integer> getExtendedColumnReferenceCount(ImmutableBitSet columnRef) { Set<String> cf = Sets.newHashSet(); int columnCount = 0; for (int i = extendedColumnsOffset; i < mappedColumns.size(); i++) { @@ -151,7 +292,7 @@ public class TableMapping { columnCount++; } } - return new Pair<Integer, Integer>(cf.size(), columnCount); + return new org.apache.hadoop.hbase.util.Pair<Integer, Integer>(cf.size(), columnCount); } public PTable createProjectedTable(boolean retainPKColumns) {