http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java index 1197193,0000000..f200a24 mode 100644,000000..100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java @@@ -1,283 -1,0 +1,283 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.calcite; + +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@RunWith(Parameterized.class) +public class CalciteIndexIT extends BaseCalciteIT { + + private final boolean localIndex; + + public CalciteIndexIT(boolean localIndex) { + this.localIndex = localIndex; + } + + @Parameters(name="localIndex = {0}") + public static Collection<Boolean[]> data() { + return Arrays.asList(new Boolean[][] { + { false }, { true } + }); + } + + @Before + public void initTable() throws Exception { + final String url = getUrl(); + final String index = localIndex ? "LOCAL INDEX" : "INDEX"; + initATableValues(getOrganizationId(), null, url); + initSaltedTables(index); + initMultiTenantTables(index); + final Connection connection = DriverManager.getConnection(url); + connection.createStatement().execute("CREATE " + index + " IF NOT EXISTS IDX1 ON aTable (a_string) INCLUDE (b_string, x_integer)"); + connection.createStatement().execute("CREATE " + index + " IF NOT EXISTS IDX2 ON aTable (b_string) INCLUDE (a_string, y_integer)"); + connection.createStatement().execute("CREATE " + index + " IF NOT EXISTS IDX_FULL ON aTable (b_string) INCLUDE (a_string, a_integer, a_date, a_time, a_timestamp, x_decimal, x_long, x_integer, y_integer, a_byte, a_short, a_float, a_double, a_unsigned_float, a_unsigned_double)"); + connection.createStatement().execute("UPDATE STATISTICS ATABLE"); + connection.createStatement().execute("UPDATE STATISTICS " + SALTED_TABLE_NAME); + connection.createStatement().execute("UPDATE STATISTICS IDX_" + SALTED_TABLE_NAME); + connection.createStatement().execute("UPDATE STATISTICS IDX1"); + connection.createStatement().execute("UPDATE STATISTICS IDX2"); + connection.createStatement().execute("UPDATE STATISTICS IDX_FULL"); + connection.close(); + } + + @Test public void testIndex() throws Exception { + start(true).sql("select * from aTable where b_string = 'b'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ORGANIZATION_ID=[$1], ENTITY_ID=[$2], A_STRING=[$3], B_STRING=[$0], A_INTEGER=[$4], A_DATE=[$5], A_TIME=[$6], A_TIMESTAMP=[$7], X_DECIMAL=[$8], X_LONG=[$9], X_INTEGER=[$10], Y_INTEGER=[$11], A_BYTE=[$12], A_SHORT=[$13], A_FLOAT=[$14], A_DOUBLE=[$15], A_UNSIGNED_FLOAT=[$16], A_UNSIGNED_DOUBLE=[$17])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_FULL]], filter=[=($0, 'b')])\n") + .close(); + start(true).sql("select x_integer from aTable") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(X_INTEGER=[$4])\n" + + " PhoenixTableScan(table=[[phoenix, IDX1]])\n") + .close(); + start(true).sql("select a_string from aTable order by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(A_STRING=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, IDX1]], scanOrder=[FORWARD])\n") + .close(); + start(true).sql("select a_string from aTable order by organization_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(A_STRING=[$2], ORGANIZATION_ID=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], scanOrder=[FORWARD])\n") + .close(); + start(true).sql("select a_integer from aTable order by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerSort(sort0=[$1], dir0=[ASC])\n" + + " PhoenixServerProject(A_INTEGER=[$4], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n") + .close(); + start(true).sql("select a_string, b_string from aTable where a_string = 'a'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(A_STRING=[$0], B_STRING=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, IDX1]], filter=[=($0, 'a')])\n") + .close(); + start(true).sql("select a_string, b_string from aTable where b_string = 'b'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(A_STRING=[$3], B_STRING=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, IDX2]], filter=[=($0, 'b')])\n") + .close(); + start(true).sql("select a_string, b_string, x_integer, y_integer from aTable where b_string = 'b'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(A_STRING=[$3], B_STRING=[$0], X_INTEGER=[$10], Y_INTEGER=[$11])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_FULL]], filter=[=($0, 'b')])\n") + .close(); + start(true).sql("select a_string, count(*) from aTable group by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()], isOrdered=[true])\n" + + " PhoenixTableScan(table=[[phoenix, IDX1]], scanOrder=[FORWARD])\n") + .close(); + } + + @Test public void testSaltedIndex() throws Exception { + start(true).sql("select count(*) from " + NOSALT_TABLE_NAME + " where col0 > 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n") - .resultIs(new Object[][]{{2L}}) ++ .resultIs(false, new Object[][]{{2L}}) + .close(); + start(true).sql("select mypk0, mypk1, col0 from " + NOSALT_TABLE_NAME + " where col0 <= 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {2, 3, 4}, + {1, 2, 3}}) + .close(); + start(true).sql("select * from " + SALTED_TABLE_NAME + " where mypk0 < 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[<($0, 3)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {1, 2, 3, 4}, + {2, 3, 4, 5}}) + .close(); + start(true).sql("select count(*) from " + SALTED_TABLE_NAME + " where col0 > 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n") - .resultIs(new Object[][]{{2L}}) ++ .resultIs(false, new Object[][]{{2L}}) + .close(); + start(true).sql("select mypk0, mypk1, col0 from " + SALTED_TABLE_NAME + " where col0 <= 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {2, 3, 4}, + {1, 2, 3}}) + .close(); + start(true).sql("select count(*) from " + SALTED_TABLE_NAME + " where col1 > 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 4)])\n") - .resultIs(new Object[][]{{2L}}) ++ .resultIs(false, new Object[][]{{2L}}) + .close(); + start(true).sql("select * from " + SALTED_TABLE_NAME + " where col1 <= 5 order by col1") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 5)], scanOrder=[FORWARD])\n") - .resultIs(new Object[][] { ++ .resultIs(true, new Object[][] { + {1, 2, 3, 4}, + {2, 3, 4, 5}}) + .close(); + start(true).sql("select * from " + SALTED_TABLE_NAME + " s1, " + SALTED_TABLE_NAME + " s2 where s1.mypk0 = s2.mypk0 and s1.mypk1 = s2.mypk1 and s1.mypk0 > 1 and s2.col1 < 6") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerJoin(condition=[AND(=($0, $4), =($1, $5))], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[>($0, 1)])\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[<(CAST($0):INTEGER, 6)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {2, 3, 4, 5, 2, 3, 4, 5}}) + .close(); + } + + @Test public void testMultiTenant() throws Exception { + Properties props = getConnectionProps(true); + start(props).sql("select * from " + MULTI_TENANT_TABLE) + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"10", "2", 3, 4, 5}, + {"15", "3", 4, 5, 6}, + {"20", "4", 5, 6, 7}, + {"20", "5", 6, 7, 8}}) + .close(); + + start(props).sql("select * from " + MULTI_TENANT_TABLE + " where tenant_id = '20' and col1 > 1") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(TENANT_ID=[$0], ID=[$2], COL0=[$3], COL1=[CAST($1):INTEGER], COL2=[$4])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[AND(=(CAST($0):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '20'), >(CAST($1):INTEGER, 1))])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"20", "4", 5, 6, 7}, + {"20", "5", 6, 7, 8}}) + .close(); + + try { + start(props).sql("select * from " + MULTI_TENANT_VIEW1) + .explainIs("") + .close(); + fail("Should have got SQLException."); + } catch (SQLException e) { + } + + props.setProperty("TenantId", "15"); + start(props).sql("select * from " + MULTI_TENANT_TABLE) + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"3", 4, 5, 6}}) + .close(); + + start(props).sql("select * from " + MULTI_TENANT_TABLE + " where col1 > 1") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[$2], COL1=[CAST($0):INTEGER], COL2=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 1)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"3", 4, 5, 6}}) + .close(); + + try { + start(props).sql("select * from " + MULTI_TENANT_VIEW1) + .explainIs("") + .close(); + fail("Should have got SQLException."); + } catch (SQLException e) { + } + + props.setProperty("TenantId", "10"); + start(props).sql("select * from " + MULTI_TENANT_VIEW1) + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"2", 3, 4, 5}}) + .close(); + + start(props).sql("select * from " + MULTI_TENANT_TABLE + " where col1 > 1") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[$2], COL1=[CAST($0):INTEGER], COL2=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 1)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"2", 3, 4, 5}}) + .close(); + + start(props).sql("select id, col0 from " + MULTI_TENANT_VIEW1 + " where col0 > 1") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[>(CAST($0):INTEGER, 1)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"2", 3}}) + .close(); + + props.setProperty("TenantId", "20"); + start(props).sql("select * from " + MULTI_TENANT_VIEW2) + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[>($3, 7)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"5", 6, 7, 8}}) + .close(); + + start(props).sql("select id, col0 from " + MULTI_TENANT_VIEW2 + " where col0 > 1") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, S2, IDX_MULTITENANT_TEST_VIEW2]], filter=[>(CAST($0):INTEGER, 1)])\n") - .resultIs(new Object[][] { ++ .resultIs(false, new Object[][] { + {"5", 6}}) + .close(); + + start(props).sql("select id, col0 from " + MULTI_TENANT_VIEW2 + " order by col0") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, S2, IDX_MULTITENANT_TEST_VIEW2]], scanOrder=[FORWARD])\n") - .resultIs(new Object[][] { ++ .resultIs(true, new Object[][] { + {"5", 6}}) + .close(); + } + +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java index 52022a8,0000000..000a0c6 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteRuntime.java @@@ -1,148 -1,0 +1,156 @@@ +package org.apache.phoenix.calcite; + +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.schema.types.PDateArray; ++import org.apache.phoenix.schema.types.PDouble; ++import org.apache.phoenix.schema.types.PFloat; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PLongArray; +import org.apache.phoenix.schema.types.PTime; +import org.apache.phoenix.schema.types.PTimeArray; +import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.schema.types.PTimestampArray; +import org.apache.phoenix.schema.types.PUnsignedDate; +import org.apache.phoenix.schema.types.PUnsignedDateArray; ++import org.apache.phoenix.schema.types.PUnsignedFloat; +import org.apache.phoenix.schema.types.PUnsignedTime; +import org.apache.phoenix.schema.types.PUnsignedTimeArray; +import org.apache.phoenix.schema.types.PUnsignedTimestamp; +import org.apache.phoenix.schema.types.PUnsignedTimestampArray; +import org.apache.phoenix.schema.types.PhoenixArray; + +import java.sql.SQLException; +import java.sql.Timestamp; + +/** + * Methods used by code generated by Calcite. + */ +public class CalciteRuntime { + public static Enumerable<Object> toEnumerable2(final ResultIterator iterator, final RowProjector rowProjector) { + return new AbstractEnumerable<Object>() { + @Override + public Enumerator<Object> enumerator() { + return toEnumerator(iterator, rowProjector); + } + }; + } + + public static Enumerable<Object> toEnumerable(final QueryPlan plan) { + try { + return toEnumerable2(plan.iterator(), plan.getProjector()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + public static Enumerator<Object> toEnumerator(final ResultIterator iterator, final RowProjector rowProjector) { + final int count = rowProjector.getColumnCount(); + return new Enumerator<Object>() { + Object current; + private final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + + @Override + public Object current() { + return current; + } + + @Override + public boolean moveNext() { + try { + final Tuple tuple = iterator.next(); + if (tuple == null) { + current = null; + return false; + } + if (count == 1) { + ColumnProjector projector = rowProjector.getColumnProjector(0); + current = project(tuple, projector); + return true; + } + Object[] array = new Object[count]; + for (int i = 0; i < count; i++) { + ColumnProjector projector = rowProjector.getColumnProjector(i); + array[i] = project(tuple, projector); + } + current = array; + return true; + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private Object project(Tuple tuple, ColumnProjector projector) throws SQLException { + @SuppressWarnings("rawtypes") + PDataType type = projector.getExpression().getDataType(); + if (PDataType.equalsAny( + type, ++ PUnsignedFloat.INSTANCE, ++ PFloat.INSTANCE)) { ++ type = PDouble.INSTANCE; ++ } else if (PDataType.equalsAny( ++ type, + PUnsignedDate.INSTANCE, + PDate.INSTANCE, + PUnsignedTime.INSTANCE, + PTime.INSTANCE)) { + type = PLong.INSTANCE; - }else if (PDataType.equalsAny( ++ } else if (PDataType.equalsAny( + type, + PUnsignedDateArray.INSTANCE, + PDateArray.INSTANCE, + PUnsignedTimeArray.INSTANCE, + PTimeArray.INSTANCE)) { + type = PLongArray.INSTANCE; + } + Object value = projector.getValue(tuple, type, ptr); + if (value != null) { + if (type.isArrayType()) { + value = ((PhoenixArray) value).getArray(); + } + if (PDataType.equalsAny( + type, + PUnsignedTimestamp.INSTANCE, + PTimestamp.INSTANCE)) { + value = ((Timestamp) value).getTime(); + } else if (PDataType.equalsAny( + type, + PUnsignedTimestampArray.INSTANCE, + PTimestampArray.INSTANCE)) { + Timestamp[] array = (Timestamp[]) value; + long[] newArray = new long[array.length]; + for (int i = 0; i < array.length; i++) { + newArray[i] = array[i].getTime(); + } + value = newArray; + } + } + + return value; + } + + @Override + public void reset() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + try { + iterator.close(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index 8b2c160,0000000..fd630ab mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@@ -1,331 -1,0 +1,331 @@@ +package org.apache.phoenix.calcite; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.materialize.MaterializationService; +import org.apache.calcite.schema.*; +import org.apache.calcite.schema.impl.ViewTable; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.parse.ColumnDef; +import org.apache.phoenix.parse.NamedTableNode; +import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ViewType; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SchemaUtil; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Implementation of Calcite's {@link Schema} SPI for Phoenix. + * + * TODO + * 1) change this to non-caching mode?? + * 2) how to deal with define indexes and views since they require a CalciteSchema + * instance?? + * + */ +public class PhoenixSchema implements Schema { + public static final Factory FACTORY = new Factory(); + + public final PhoenixConnection pc; + + protected final String name; + protected final String schemaName; + protected final SchemaPlus parentSchema; + protected final MetaDataClient client; + + protected final Map<String, Schema> subSchemas; + protected final Map<String, Table> tables; + protected final Map<String, Function> views; + protected final Set<PTable> viewTables; + + protected PhoenixSchema(String name, String schemaName, + SchemaPlus parentSchema, PhoenixConnection pc) { + this.name = name; + this.schemaName = schemaName; + this.parentSchema = parentSchema; + this.pc = pc; + this.client = new MetaDataClient(pc); + this.subSchemas = Maps.newHashMap(); + this.tables = Maps.newHashMap(); + this.views = Maps.newHashMap(); + this.viewTables = Sets.newHashSet(); + } + + private static Schema create(SchemaPlus parentSchema, + String name, Map<String, Object> operand) { + String url = (String) operand.get("url"); + final Properties properties = new Properties(); + for (Map.Entry<String, Object> entry : operand.entrySet()) { + properties.setProperty(entry.getKey(), String.valueOf(entry.getValue())); + } + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + final Connection connection = + DriverManager.getConnection(url, properties); + final PhoenixConnection phoenixConnection = + connection.unwrap(PhoenixConnection.class); + return new PhoenixSchema(name, null, parentSchema, phoenixConnection); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public Table getTable(String name) { + Table table = tables.get(name); + if (table != null) { + return table; + } + + try { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create( + null, + TableName.create(schemaName, name), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + PTable pTable = tables.get(0).getTable(); + if (!isView(pTable)) { + pTable = fixTableMultiTenancy(pTable); + table = new PhoenixTable(pc, pTable); + } + } catch (TableNotFoundException e) { + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (table == null) { + table = resolveSequence(name); + } + + if (table != null) { + tables.put(name, table); + } + return table; + } + + @Override + public Set<String> getTableNames() { + return tables.keySet(); + } + + @Override + public Collection<Function> getFunctions(String name) { + Function func = views.get(name); + if (func != null) { + return ImmutableList.of(func); + } + + try { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create( + null, + TableName.create(schemaName, name), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + PTable pTable = tables.get(0).getTable(); + if (isView(pTable)) { + String viewSql = pTable.getViewStatement(); + if (viewSql == null) { + viewSql = "select * from " + + SchemaUtil.getEscapedFullTableName( + pTable.getPhysicalName().getString()); + } + SchemaPlus schema = parentSchema.getSubSchema(this.name); + SchemaPlus viewSqlSchema = + this.schemaName == null ? schema : parentSchema; + func = ViewTable.viewMacro(schema, viewSql, + CalciteSchema.from(viewSqlSchema).path(null), + pTable.getViewType() == ViewType.UPDATABLE); + views.put(name, func); + viewTables.add(pTable); + } + } catch (TableNotFoundException e) { + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return func == null ? Collections.<Function>emptyList() : ImmutableList.of(func); + } + + @Override + public Set<String> getFunctionNames() { + return views.keySet(); + } + + @Override + public Schema getSubSchema(String name) { + if (schemaName != null) { + return null; + } + + Schema schema = subSchemas.get(name); + if (schema != null) { + return schema; + } + + schema = new PhoenixSchema(name, name, parentSchema.getSubSchema(this.name), pc); + subSchemas.put(name, schema); + return schema; + } + + @Override + public Set<String> getSubSchemaNames() { + return subSchemas.keySet(); + } + + @Override + public Expression getExpression(SchemaPlus parentSchema, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isMutable() { + return true; + } + + @Override + public boolean contentsHaveChangedSince(long lastCheck, long now) { + return false; + } + + public void defineIndexesAsMaterializations() { + SchemaPlus schema = parentSchema.getSubSchema(this.name); + SchemaPlus viewSqlSchema = + this.schemaName == null ? schema : parentSchema; + CalciteSchema calciteSchema = CalciteSchema.from(schema); + List<String> path = CalciteSchema.from(viewSqlSchema).path(null); + try { + for (Table table : tables.values()) { + if (table instanceof PhoenixTable) { + PTable pTable = ((PhoenixTable) table).pTable; + for (PTable index : pTable.getIndexes()) { + addMaterialization(index, path, calciteSchema); + } + } + } + for (PTable pTable : viewTables) { + for (PTable index : pTable.getIndexes()) { + if (index.getParentName().equals(pTable.getName())) { + addMaterialization(index, path, calciteSchema); + } + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void addMaterialization(PTable index, List<String> path, + CalciteSchema calciteSchema) throws SQLException { + index = fixTableMultiTenancy(index); + StringBuffer sb = new StringBuffer(); + sb.append("SELECT"); + for (PColumn column : PhoenixTable.getMappedColumns(index)) { + String indexColumnName = column.getName().getString(); + String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); + sb.append(",").append(SchemaUtil.getEscapedFullColumnName(dataColumnName)); + sb.append(" ").append(SchemaUtil.getEscapedFullColumnName(indexColumnName)); + } + sb.setCharAt(6, ' '); // replace first comma with space. + sb.append(" FROM ").append(SchemaUtil.getEscapedFullTableName(index.getParentName().getString())); + MaterializationService.instance().defineMaterialization( + calciteSchema, null, sb.toString(), path, index.getTableName().getString(), true, true); + } + + private boolean isView(PTable table) { + return table.getType() == PTableType.VIEW + && table.getViewType() != ViewType.MAPPED; + } + + private PTable fixTableMultiTenancy(PTable table) throws SQLException { + if (pc.getTenantId() != null || !table.isMultiTenant()) { + return table; + } + return PTableImpl.makePTable( + table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), + table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), PTableImpl.getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), + table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), + table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional()); ++ table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getTableStats(), table.getBaseColumnCount()); + } + + private PhoenixSequence resolveSequence(String name) { + try { + // FIXME: Do this the same way as resolving a table after PHOENIX-2489. + String tenantId = pc.getTenantId() == null ? null : pc.getTenantId().getString(); + String q = "select 1 from " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED + + " where " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + + (schemaName == null ? " is null" : " = '" + schemaName + "'") + + " and " + PhoenixDatabaseMetaData.SEQUENCE_NAME + + " = '" + name + "'" + + " and " + PhoenixDatabaseMetaData.TENANT_ID + + (tenantId == null ? " is null" : " = '" + tenantId + "'"); + ResultSet rs = pc.createStatement().executeQuery(q); + if (rs.next()) { + return new PhoenixSequence(schemaName, name, pc); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return null; + } + + /** Schema factory that creates a + * {@link org.apache.phoenix.calcite.PhoenixSchema}. + * This allows you to create a Phoenix schema inside a model.json file. + * + * <pre>{@code + * { + * version: '1.0', + * defaultSchema: 'HR', + * schemas: [ + * { + * name: 'HR', + * type: 'custom', + * factory: 'org.apache.phoenix.calcite.PhoenixSchema.Factory', + * operand: { + * url: "jdbc:phoenix:localhost", + * user: "scott", + * password: "tiger" + * } + * } + * ] + * } + * }</pre> + */ + public static class Factory implements SchemaFactory { + public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { + return PhoenixSchema.create(parentSchema, name, operand); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java index 44f3031,0000000..e1afc14 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java @@@ -1,191 -1,0 +1,191 @@@ +package org.apache.phoenix.calcite.rel; + +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Stack; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixSequence; +import org.apache.phoenix.calcite.PhoenixTable; +import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext; +import org.apache.phoenix.compile.ColumnProjector; +import org.apache.phoenix.compile.ExpressionProjector; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.compile.SequenceValueExpression; +import org.apache.phoenix.compile.TupleProjectionCompiler; +import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.execute.RuntimeContext; +import org.apache.phoenix.execute.TupleProjector; +import org.apache.phoenix.expression.ColumnExpression; +import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.parse.SequenceValueParseNode; +import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.schema.ColumnRef; +import org.apache.phoenix.schema.KeyValueSchema; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnImpl; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.collect.Lists; + +public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor { + private final RuntimeContext runtimeContext; + private TableRef tableRef; + private List<PColumn> mappedColumns; + private Stack<ImplementorContext> contextStack; + private SequenceManager sequenceManager; + + public PhoenixRelImplementorImpl(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + this.contextStack = new Stack<ImplementorContext>(); + } + + @Override + public QueryPlan visitInput(int i, PhoenixRel input) { + return input.implement(this); + } + + @Override + public ColumnExpression newColumnExpression(int index) { + ColumnRef colRef = new ColumnRef(this.tableRef, this.mappedColumns.get(index).getPosition()); + return colRef.newColumnExpression(); + } + + @SuppressWarnings("rawtypes") + @Override + public Expression newFieldAccessExpression(String variableId, int index, PDataType type) { + Expression fieldAccessExpr = runtimeContext.newCorrelateVariableReference(variableId, index); + return new CorrelateVariableFieldAccessExpression(runtimeContext, variableId, fieldAccessExpr); + } + + @Override + public SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op) { + PName tenantName = seq.pc.getTenantId(); + TableName tableName = TableName.create(seq.schemaName, seq.sequenceName); + try { + return sequenceManager.newSequenceReference(tenantName, tableName, null, op); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public RuntimeContext getRuntimeContext() { + return runtimeContext; + } + + @Override + public void setTableRef(TableRef tableRef) { + this.tableRef = tableRef; + this.mappedColumns = PhoenixTable.getMappedColumns(tableRef.getTable()); + } + + @Override + public TableRef getTableRef() { + return this.tableRef; + } + + @Override + public void setSequenceManager(SequenceManager sequenceManager) { + this.sequenceManager = sequenceManager; + } + + @Override + public void pushContext(ImplementorContext context) { + this.contextStack.push(context); + } + + @Override + public ImplementorContext popContext() { + return contextStack.pop(); + } + + @Override + public ImplementorContext getCurrentContext() { + return contextStack.peek(); + } + + @Override + public PTable createProjectedTable() { + List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList(); + List<PColumn> columns = getCurrentContext().retainPKColumns ? + getTableRef().getTable().getColumns() : mappedColumns; + for (PColumn column : columns) { + sourceColumnRefs.add(new ColumnRef(getTableRef(), column.getPosition())); + } + + try { + return TupleProjectionCompiler.createProjectedTable(getTableRef(), sourceColumnRefs, getCurrentContext().retainPKColumns); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public TupleProjector createTupleProjector() { + KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0); + List<Expression> exprs = Lists.<Expression> newArrayList(); + for (PColumn column : mappedColumns) { + if (!SchemaUtil.isPKColumn(column) || !getCurrentContext().retainPKColumns) { + Expression expr = new ColumnRef(tableRef, column.getPosition()).newColumnExpression(); + exprs.add(expr); + builder.addField(expr); + } + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + + @Override + public RowProjector createRowProjector() { + List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList(); + for (int i = 0; i < mappedColumns.size(); i++) { + PColumn column = mappedColumns.get(i); + Expression expr = newColumnExpression(i); // Do not use column.position() here. + columnProjectors.add(new ExpressionProjector(column.getName().getString(), getTableRef().getTable().getName().getString(), expr, false)); + } + // TODO get estimate row size + return new RowProjector(columnProjectors, 0, false); + } + + @Override + public TupleProjector project(List<Expression> exprs) { + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + List<PColumn> columns = Lists.<PColumn>newArrayList(); + for (int i = 0; i < exprs.size(); i++) { + String name = ParseNodeFactory.createTempAlias(); + Expression expr = exprs.get(i); + builder.addField(expr); + columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), + expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(), - i, expr.getSortOrder(), null, null, false, name, false)); ++ i, expr.getSortOrder(), null, null, false, name, false, false)); + } + try { + PTable pTable = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, + PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, + null, null, columns, null, null, Collections.<PTable>emptyList(), + false, Collections.<PName>emptyList(), null, null, false, false, false, null, - null, null, true, false); ++ null, null, true, false, 0); + this.setTableRef(new TableRef(CalciteUtils.createTempAlias(), pTable, HConstants.LATEST_TIMESTAMP, false)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()])); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 64a9140,5fdec46..e7a00ed --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@@ -450,16 -443,6 +450,16 @@@ public class HashJoinPlan extends Deleg return plan; } } + + @Override + public QueryPlan limit(Integer limit) { + QueryPlan delegate = this.delegate.limit(limit); + if (delegate == this.delegate) + return this; + + return new HashJoinPlan(this.statement, delegate, this.joinInfo, - this.subPlans, this.recompileWhereClause); ++ this.subPlans, this.recompileWhereClause, dependencies); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index b3235e2,b3235e2..51715f9 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@@ -143,7 -143,7 +143,7 @@@ public abstract class BaseResultIterato if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) { return false; } -- return true; ++ return false; } private static void initializeScan(QueryPlan plan, Integer perScanLimit) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/74409be8/pom.xml ---------------------------------------------------------------------- diff --cc pom.xml index a1dab3c,4373753..fc642da --- a/pom.xml +++ b/pom.xml @@@ -109,9 -109,12 +109,12 @@@ <collections.version>3.2.1</collections.version> <jodatime.version>2.7</jodatime.version> <joni.version>2.1.2</joni.version> - <calcite.version>1.6.0</calcite.version> + <calcite.version>1.6.0-SNAPSHOT</calcite.version> <jettyVersion>8.1.7.v20120910</jettyVersion> <tephra.version>0.6.4</tephra.version> + <spark.version>1.5.2</spark.version> + <scala.version>2.10.4</scala.version> + <scala.binary.version>2.10</scala.binary.version> <!-- Test Dependencies --> <mockito-all.version>1.8.5</mockito-all.version>