Repository: phoenix Updated Branches: refs/heads/calcite bb519e51c -> 1f5ee4fcc
PHOENIX-2202 Implement PhoenixUncollect Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1f5ee4fc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1f5ee4fc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1f5ee4fc Branch: refs/heads/calcite Commit: 1f5ee4fcc103e9a247a422e4704eda89e3132342 Parents: bb519e5 Author: maryannxue <wei....@intel.com> Authored: Mon Aug 24 13:49:04 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Mon Aug 24 13:49:04 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 46 +++++++++++++++ .../apache/phoenix/calcite/PhoenixTable.java | 10 +++- .../phoenix/calcite/rel/PhoenixUncollect.java | 59 ++++++++++++++++++++ .../calcite/rules/PhoenixConverterRules.java | 26 +++++++++ .../apache/phoenix/execute/UnnestArrayPlan.java | 10 ++++ 5 files changed, 148 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index 4f22873..60fa6ea 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -289,6 +289,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { ensureTableCreated(url, ATABLE_NAME); initATableValues(getOrganizationId(), null, url); initJoinTableValues(url, null, null); + initArrayTable(); createIndices( "CREATE INDEX IDX1 ON aTable (a_string) INCLUDE (b_string, x_integer)", "CREATE INDEX IDX2 ON aTable (b_string) INCLUDE (a_string, y_integer)", @@ -300,6 +301,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { connection.createStatement().execute("UPDATE STATISTICS " + JOIN_ITEM_TABLE_FULL_NAME); connection.createStatement().execute("UPDATE STATISTICS " + JOIN_SUPPLIER_TABLE_FULL_NAME); connection.createStatement().execute("UPDATE STATISTICS " + JOIN_ORDER_TABLE_FULL_NAME); + connection.createStatement().execute("UPDATE STATISTICS " + SCORES_TABLE_NAME); connection.createStatement().execute("UPDATE STATISTICS IDX1"); connection.createStatement().execute("UPDATE STATISTICS IDX2"); connection.createStatement().execute("UPDATE STATISTICS IDX_FULL"); @@ -318,6 +320,33 @@ public class CalciteIT extends BaseClientManagedTimeIT { conn.close(); } + protected static final String SCORES_TABLE_NAME = "scores"; + + protected void initArrayTable() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.createStatement().execute( + "CREATE TABLE " + SCORES_TABLE_NAME + + "(student_id INTEGER PRIMARY KEY, scores INTEGER[])"); + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + SCORES_TABLE_NAME + + " VALUES(?, ?)"); + stmt.setInt(1, 1); + stmt.setArray(2, conn.createArrayOf("INTEGER", new Integer[] {85, 80, 82})); + stmt.execute(); + stmt.setInt(1, 2); + stmt.setArray(2, null); + stmt.execute(); + stmt.setInt(1, 3); + stmt.setArray(2, conn.createArrayOf("INTEGER", new Integer[] {87, 88, 80})); + stmt.execute(); + conn.commit(); + } catch (TableAlreadyExistsException e) { + } + conn.close(); + } + @Test public void testTableScan() throws Exception { start().sql("select * from aTable where a_string = 'a'") .explainIs("PhoenixToEnumerableConverter\n" + @@ -1006,6 +1035,23 @@ public class CalciteIT extends BaseClientManagedTimeIT { .close(); } + @Test public void testUnnest() { + start().sql("SELECT t.s FROM UNNEST((SELECT scores FROM " + SCORES_TABLE_NAME + ")) AS t(s)") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixUncollect\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerProject(EXPR$0=[$1])\n" + + " PhoenixTableScan(table=[[phoenix, SCORES]])\n") + .resultIs(new Object[][] { + {85}, + {80}, + {82}, + {87}, + {88}, + {80}}) + .close(); + } + @Test public void testSelectFromView() { start().sql("select * from v") .explainIs("PhoenixToEnumerableConverter\n" + http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index 1ea0be3..cfda441 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -67,9 +67,13 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { public RelDataType getRowType(RelDataTypeFactory typeFactory) { final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); for (PColumn pColumn : pTable.getColumns()) { - final int sqlTypeId = pColumn.getDataType().getSqlType(); + final PDataType baseType = + pColumn.getDataType().isArrayType() ? + PDataType.fromTypeId(pColumn.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE) + : pColumn.getDataType(); + final int sqlTypeId = baseType.getResultSetSqlType(); final PDataType pDataType = PDataType.fromTypeId(sqlTypeId); - final SqlTypeName sqlTypeName1 = SqlTypeName.valueOf(pDataType.isArrayType() ? PDataType.fromTypeId(pDataType.getSqlType() - PDataType.ARRAY_TYPE_BASE).getSqlTypeName() : pDataType.getSqlTypeName()); + final SqlTypeName sqlTypeName1 = SqlTypeName.valueOf(pDataType.getSqlTypeName()); final Integer maxLength = pColumn.getMaxLength(); final Integer scale = pColumn.getScale(); RelDataType type; @@ -80,7 +84,7 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { } else { type = typeFactory.createSqlType(sqlTypeName1); } - if (pDataType.isArrayType()) { + if (pColumn.getDataType().isArrayType()) { final Integer arraySize = pColumn.getArraySize(); type = typeFactory.createArrayType(type, arraySize == null ? -1 : arraySize); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java new file mode 100644 index 0000000..aa53ae4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java @@ -0,0 +1,59 @@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.Arrays; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Uncollect; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.execute.UnnestArrayPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.schema.types.PDataType; + +public class PhoenixUncollect extends Uncollect implements PhoenixRel { + + public static PhoenixUncollect create(RelNode input) { + RelOptCluster cluster = input.getCluster(); + RelTraitSet traits = cluster.traitSetOf(PhoenixRel.CLIENT_CONVENTION); + return new PhoenixUncollect(cluster, traits, input); + } + + private PhoenixUncollect(RelOptCluster cluster, RelTraitSet traitSet, + RelNode child) { + super(cluster, traitSet, child); + } + + @Override + public PhoenixUncollect copy(RelTraitSet traitSet, + RelNode newInput) { + return create(newInput); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + if (getInput().getConvention() != PhoenixRel.CLIENT_CONVENTION) + return planner.getCostFactory().makeInfiniteCost(); + + return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR); + } + + @Override + public QueryPlan implement(Implementor implementor) { + QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput()); + Expression arrayExpression = implementor.newColumnExpression(0); + @SuppressWarnings("rawtypes") + PDataType baseType = PDataType.fromTypeId(arrayExpression.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE); + try { + implementor.project(Arrays.<Expression> asList(LiteralExpression.newConstant(null, baseType))); + } catch (SQLException e) { + throw new RuntimeException(e); + } + return new UnnestArrayPlan(plan, arrayExpression, false); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java index 4179e0a..210306d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java @@ -24,6 +24,7 @@ import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.Uncollect; import org.apache.calcite.rel.core.Union; import org.apache.calcite.rel.logical.LogicalAggregate; import org.apache.calcite.rel.logical.LogicalFilter; @@ -49,6 +50,7 @@ import org.apache.phoenix.calcite.rel.PhoenixServerProject; import org.apache.phoenix.calcite.rel.PhoenixServerSort; import org.apache.phoenix.calcite.rel.PhoenixToClientConverter; import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter; +import org.apache.phoenix.calcite.rel.PhoenixUncollect; import org.apache.phoenix.calcite.rel.PhoenixUnion; import org.apache.phoenix.calcite.rel.PhoenixValues; @@ -84,6 +86,7 @@ public class PhoenixConverterRules { PhoenixClientJoinRule.INSTANCE, PhoenixServerJoinRule.INSTANCE, PhoenixValuesRule.INSTANCE, + PhoenixUncollectRule.INSTANCE, }; public static final RelOptRule[] CONVERTIBLE_RULES = { @@ -104,6 +107,7 @@ public class PhoenixConverterRules { PhoenixClientJoinRule.CONVERTIBLE, PhoenixServerJoinRule.CONVERTIBLE, PhoenixValuesRule.INSTANCE, + PhoenixUncollectRule.INSTANCE, }; /** Base class for planner rules that convert a relational expression to @@ -598,6 +602,28 @@ public class PhoenixConverterRules { } /** + * Rule to convert a {@link org.apache.calcite.rel.core.Uncollect} to a + * {@link PhoenixUncollect}. + */ + public static class PhoenixUncollectRule extends PhoenixConverterRule { + + private static final PhoenixUncollectRule INSTANCE = new PhoenixUncollectRule(); + + private PhoenixUncollectRule() { + super(Uncollect.class, Convention.NONE, + PhoenixRel.CLIENT_CONVENTION, "PhoenixUncollectRule"); + } + + public RelNode convert(RelNode rel) { + final Uncollect uncollect = (Uncollect) rel; + return PhoenixUncollect.create( + convert( + uncollect.getInput(), + uncollect.getInput().getTraitSet().replace(out))); + } + } + + /** * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect} * to an {@link PhoenixIntersectRel}. o/ http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java index c4a6b20..125baf3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.compile.ExplainPlan; +import org.apache.phoenix.compile.OrderByCompiler.OrderBy; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.expression.BaseSingleExpression; import org.apache.phoenix.expression.BaseTerminalExpression; @@ -180,4 +181,13 @@ public class UnnestArrayPlan extends DelegateQueryPlan { return PInteger.INSTANCE; } } + + @Override + public QueryPlan limit(Integer limit) { + if (limit == null) + return this; + + return new ClientScanPlan(this.getContext(), this.getStatement(), this.getTableRef(), + this.getProjector(), limit, null, OrderBy.EMPTY_ORDER_BY, this); + } }