Repository: phoenix Updated Branches: refs/heads/calcite 59abec558 -> df370f438
PHOENIX-2259 SqlValidator exception if salted index is created on non-salted table Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/df370f43 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/df370f43 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/df370f43 Branch: refs/heads/calcite Commit: df370f438ebc5abc98e9540488964f7181673cf4 Parents: 59abec5 Author: maryannxue <wei....@intel.com> Authored: Mon Sep 14 23:27:21 2015 -0400 Committer: maryannxue <wei....@intel.com> Committed: Mon Sep 14 23:27:21 2015 -0400 ---------------------------------------------------------------------- .../org/apache/phoenix/calcite/CalciteIT.java | 134 +++++++++++++++++++ .../apache/phoenix/calcite/CalciteUtils.java | 3 +- .../apache/phoenix/calcite/PhoenixSchema.java | 2 + .../apache/phoenix/calcite/PhoenixTable.java | 6 +- .../calcite/rel/PhoenixRelImplementorImpl.java | 12 +- .../phoenix/calcite/rel/PhoenixTableScan.java | 12 +- 6 files changed, 160 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/df370f43/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java index 10ccd8b..b4f187f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java @@ -267,6 +267,7 @@ public class CalciteIT extends BaseClientManagedTimeIT { initATableValues(getOrganizationId(), null, url); initJoinTableValues(url, null, null); initArrayTable(); + initSaltedTables(); createIndices( "CREATE INDEX IDX1 ON aTable (a_string) INCLUDE (b_string, x_integer)", "CREATE INDEX IDX2 ON aTable (b_string) INCLUDE (a_string, y_integer)", @@ -279,6 +280,8 @@ public class CalciteIT extends BaseClientManagedTimeIT { connection.createStatement().execute("UPDATE STATISTICS " + JOIN_SUPPLIER_TABLE_FULL_NAME); connection.createStatement().execute("UPDATE STATISTICS " + JOIN_ORDER_TABLE_FULL_NAME); connection.createStatement().execute("UPDATE STATISTICS " + SCORES_TABLE_NAME); + connection.createStatement().execute("UPDATE STATISTICS " + SALTED_TABLE_NAME); + connection.createStatement().execute("UPDATE STATISTICS IDX_" + SALTED_TABLE_NAME); connection.createStatement().execute("UPDATE STATISTICS IDX1"); connection.createStatement().execute("UPDATE STATISTICS IDX2"); connection.createStatement().execute("UPDATE STATISTICS IDX_FULL"); @@ -327,6 +330,71 @@ public class CalciteIT extends BaseClientManagedTimeIT { conn.close(); } + protected static final String NOSALT_TABLE_NAME = "nosalt_test_table"; + protected static final String NOSALT_TABLE_SALTED_INDEX_NAME = "idxsalted_nosalt_test_table"; + protected static final String SALTED_TABLE_NAME = "salted_test_table"; + protected static final String SALTED_TABLE_NOSALT_INDEX_NAME = "idx_salted_test_table"; + protected static final String SALTED_TABLE_SALTED_INDEX_NAME = "idxsalted_salted_test_table"; + + protected void initSaltedTables() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.createStatement().execute( + "CREATE TABLE " + NOSALT_TABLE_NAME + " (mypk0 INTEGER NOT NULL, mypk1 INTEGER NOT NULL, col0 INTEGER, col1 INTEGER CONSTRAINT pk PRIMARY KEY (mypk0, mypk1))"); + PreparedStatement stmt = conn.prepareStatement( + "UPSERT INTO " + NOSALT_TABLE_NAME + + " VALUES(?, ?, ?, ?)"); + stmt.setInt(1, 1); + stmt.setInt(2, 2); + stmt.setInt(3, 3); + stmt.setInt(4, 4); + stmt.execute(); + stmt.setInt(1, 2); + stmt.setInt(2, 3); + stmt.setInt(3, 4); + stmt.setInt(4, 5); + stmt.execute(); + stmt.setInt(1, 3); + stmt.setInt(2, 4); + stmt.setInt(3, 5); + stmt.setInt(4, 6); + stmt.execute(); + conn.commit(); + + conn.createStatement().execute("CREATE INDEX " + NOSALT_TABLE_SALTED_INDEX_NAME + " ON " + NOSALT_TABLE_NAME + " (col0) SALT_BUCKETS=4"); + conn.commit(); + + conn.createStatement().execute( + "CREATE TABLE " + SALTED_TABLE_NAME + " (mypk0 INTEGER NOT NULL, mypk1 INTEGER NOT NULL, col0 INTEGER, col1 INTEGER CONSTRAINT pk PRIMARY KEY (mypk0, mypk1)) SALT_BUCKETS=4"); + stmt = conn.prepareStatement( + "UPSERT INTO " + SALTED_TABLE_NAME + + " VALUES(?, ?, ?, ?)"); + stmt.setInt(1, 1); + stmt.setInt(2, 2); + stmt.setInt(3, 3); + stmt.setInt(4, 4); + stmt.execute(); + stmt.setInt(1, 2); + stmt.setInt(2, 3); + stmt.setInt(3, 4); + stmt.setInt(4, 5); + stmt.execute(); + stmt.setInt(1, 3); + stmt.setInt(2, 4); + stmt.setInt(3, 5); + stmt.setInt(4, 6); + stmt.execute(); + conn.commit(); + + conn.createStatement().execute("CREATE INDEX " + SALTED_TABLE_NOSALT_INDEX_NAME + " ON " + SALTED_TABLE_NAME + " (col0)"); + conn.createStatement().execute("CREATE INDEX " + SALTED_TABLE_SALTED_INDEX_NAME + " ON " + SALTED_TABLE_NAME + " (col1) INCLUDE (col0) SALT_BUCKETS=4"); + conn.commit(); + } catch (TableAlreadyExistsException e) { + } + conn.close(); + } + @Test public void testTableScan() throws Exception { start(false).sql("select * from aTable where a_string = 'a'") .explainIs("PhoenixToEnumerableConverter\n" + @@ -1224,6 +1292,72 @@ public class CalciteIT extends BaseClientManagedTimeIT { {"00D300000000XHP", "00A423122312312", "a"}}) .close(); } + + @Test public void testSaltedIndex() { + start(true).sql("select count(*) from " + NOSALT_TABLE_NAME + " where col0 > 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n") + .resultIs(new Object[][]{{2L}}) + .close(); + start(true).sql("select mypk0, mypk1, col0 from " + NOSALT_TABLE_NAME + " where col0 <= 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n") + .resultIs(new Object[][] { + {2, 3, 4}, + {1, 2, 3}}) + .close(); + start(true).sql("select * from " + SALTED_TABLE_NAME + " where mypk0 < 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[<($0, 3)])\n") + .resultIs(new Object[][] { + {1, 2, 3, 4}, + {2, 3, 4, 5}}) + .close(); + start(true).sql("select count(*) from " + SALTED_TABLE_NAME + " where col0 > 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n") + .resultIs(new Object[][]{{2L}}) + .close(); + start(true).sql("select mypk0, mypk1, col0 from " + SALTED_TABLE_NAME + " where col0 <= 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n") + .resultIs(new Object[][] { + {2, 3, 4}, + {1, 2, 3}}) + .close(); + start(true).sql("select count(*) from " + SALTED_TABLE_NAME + " where col1 > 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 4)])\n") + .resultIs(new Object[][]{{2L}}) + .close(); + start(true).sql("select * from " + SALTED_TABLE_NAME + " where col1 <= 5") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 5)])\n") + .resultIs(new Object[][] { + {1, 2, 3, 4}, + {2, 3, 4, 5}}) + .close(); + start(true).sql("select * from " + SALTED_TABLE_NAME + " s1, " + SALTED_TABLE_NAME + " s2 where s1.mypk0 = s2.mypk0 and s1.mypk1 = s2.mypk1 and s1.mypk0 > 1 and s2.col1 < 6") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixToClientConverter\n" + + " PhoenixServerJoin(condition=[AND(=($0, $4), =($1, $5))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[>($0, 1)])\n" + + " PhoenixToClientConverter\n" + + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[<($3, 6)])\n") + .resultIs(new Object[][] { + {2, 3, 4, 5, 2, 3, 4, 5}}) + .close(); + } /** Tests a simple command that is defined in Phoenix's extended SQL parser. */ @Ignore http://git-wip-us.apache.org/repos/asf/phoenix/blob/df370f43/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java index ef67de0..e5a2372 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java @@ -76,6 +76,7 @@ import org.apache.phoenix.expression.function.SqrtFunction; import org.apache.phoenix.expression.function.TrimFunction; import org.apache.phoenix.expression.function.UpperFunction; import org.apache.phoenix.parse.JoinTableNode.JoinType; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.types.PDataType; @@ -797,7 +798,7 @@ public class CalciteUtils { PDataType fromDataType = childExpr.getDataType(); Expression expr = childExpr; - if(fromDataType != null) { + if(fromDataType != null && implementor.getTableRef().getTable().getType() != PTableType.INDEX) { expr = convertToRoundExpressionIfNeeded(fromDataType, targetDataType, childExpr); } return CoerceExpression.create(expr, targetDataType, SortOrder.getDefault(), expr.getMaxLength(), implementor.getTableRef().getTable().rowKeyOrderOptimizable()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/df370f43/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index 375e4a4..55c9d6a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@ -22,6 +22,7 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.IndexUtil; @@ -194,6 +195,7 @@ public class PhoenixSchema implements Schema { StringBuffer sb = new StringBuffer(); sb.append("SELECT"); for (PColumn column : index.getColumns()) { + if (column == SaltingUtil.SALTING_COLUMN) continue; String indexColumnName = column.getName().getString(); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); sb.append(",").append("\"").append(dataColumnName).append("\""); http://git-wip-us.apache.org/repos/asf/phoenix/blob/df370f43/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java index 5c786b5..92ed628 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java @@ -24,6 +24,7 @@ import org.apache.phoenix.calcite.rel.PhoenixTableScan; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.types.PDataType; @@ -67,6 +68,7 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { public RelDataType getRowType(RelDataTypeFactory typeFactory) { final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder(); for (PColumn pColumn : pTable.getColumns()) { + if (pColumn == SaltingUtil.SALTING_COLUMN) continue; final PDataType baseType = pColumn.getDataType().isArrayType() ? PDataType.fromTypeId(pColumn.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE) @@ -123,7 +125,9 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable { @Override public List<RelCollation> getCollations() { - return ImmutableList.<RelCollation> of(collation); + return pTable.getBucketNum() == null ? + ImmutableList.<RelCollation> of(collation) + : ImmutableList.<RelCollation>of(); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/df370f43/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index c4ad62d..8b3919b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@ -29,6 +29,7 @@ import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; @@ -51,7 +52,8 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { @Override public ColumnExpression newColumnExpression(int index) { - ColumnRef colRef = new ColumnRef(this.tableRef, index); + int pos = this.tableRef.getTable().getBucketNum() == null ? index : (index + 1); + ColumnRef colRef = new ColumnRef(this.tableRef, pos); return colRef.newColumnExpression(); } @@ -59,7 +61,8 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { @Override public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { TableRef variableDef = runtimeContext.getCorrelateVariableDef(variableId); - Expression fieldAccessExpr = new ColumnRef(variableDef, index).newColumnExpression(); + int pos = variableDef.getTable().getBucketNum() == null ? index : (index + 1); + Expression fieldAccessExpr = new ColumnRef(variableDef, pos).newColumnExpression(); return new CorrelateVariableFieldAccessExpression(runtimeContext, variableId, fieldAccessExpr); } @@ -97,6 +100,7 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { public PTable createProjectedTable() { List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); for (PColumn column : getTableRef().getTable().getColumns()) { + if (!getCurrentContext().retainPKColumns && column == SaltingUtil.SALTING_COLUMN) continue; sourceColumnRefs.add(new ColumnRef(getTableRef(), column.getPosition())); } @@ -110,8 +114,10 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { @Override public RowProjector createRowProjector() { List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + int pos = 0; for (PColumn column : getTableRef().getTable().getColumns()) { - Expression expr = newColumnExpression(column.getPosition()); + if (column == SaltingUtil.SALTING_COLUMN) continue; + Expression expr = newColumnExpression(pos++); // Do not use column.position() here. columnProjectors.add(new ExpressionProjector(column.getName().getString(), getTableRef().getTable().getName().getString(), expr, false)); } // TODO get estimate row size http://git-wip-us.apache.org/repos/asf/phoenix/blob/df370f43/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java index 407afd1..b10e594 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java @@ -39,10 +39,12 @@ import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.iterate.ParallelIteratorFactory; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.SchemaUtil; @@ -203,7 +205,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { } projectColumnFamilies(context.getScan(), phoenixTable.getTable(), columnRefList); if (implementor.getCurrentContext().forceProject) { - TupleProjector tupleProjector = createTupleProjector(implementor, phoenixTable.getTable()); + TupleProjector tupleProjector = createTupleProjector(implementor); TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector); PTable projectedTable = implementor.createProjectedTable(); implementor.setTableRef(new TableRef(projectedTable)); @@ -217,12 +219,14 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel { } } - private TupleProjector createTupleProjector(Implementor implementor, PTable table) { + private TupleProjector createTupleProjector(Implementor implementor) { KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); List<Expression> exprs = Lists.<Expression> newArrayList(); - for (PColumn column : table.getColumns()) { + TableRef tableRef = implementor.getTableRef(); + for (PColumn column : tableRef.getTable().getColumns()) { + if (column == SaltingUtil.SALTING_COLUMN) continue; if (!SchemaUtil.isPKColumn(column) || !implementor.getCurrentContext().retainPKColumns) { - Expression expr = implementor.newColumnExpression(column.getPosition()); + Expression expr = new ColumnRef(tableRef, column.getPosition()).newColumnExpression(); exprs.add(expr); builder.addField(expr); }