Repository: phoenix Updated Branches: refs/heads/calcite 1f43b8fc5 -> 4c8847987
PHOENIX=2523 Use same client/server caching mechanism for Phoenix/Calcite Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4c884798 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4c884798 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4c884798 Branch: refs/heads/calcite Commit: 4c88479871b6c6c569df9375a3a1ef16ca08fd51 Parents: 1f43b8f Author: maryannxue <maryann....@gmail.com> Authored: Mon Jan 4 22:01:19 2016 -0500 Committer: maryannxue <maryann....@gmail.com> Committed: Mon Jan 4 22:01:19 2016 -0500 ---------------------------------------------------------------------- .../apache/phoenix/calcite/BaseCalciteIT.java | 4 +- .../apache/phoenix/calcite/CalciteIndexIT.java | 14 +- .../calcite/jdbc/PhoenixCalciteFactory.java | 244 ++++++++++++++ .../apache/phoenix/calcite/PhoenixSchema.java | 333 +++++++++---------- .../calcite/jdbc/PhoenixCalciteDriver.java | 14 +- .../calcite/jdbc/PhoenixPrepareImpl.java | 35 +- 6 files changed, 441 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c884798/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java index d288429..55865f0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java @@ -357,9 +357,9 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT { protected static final String MULTI_TENANT_TABLE = "multitenant_test_table"; protected static final String MULTI_TENANT_TABLE_INDEX = "idx_multitenant_test_table"; - protected static final String MULTI_TENANT_VIEW1 = "multitenant_test_view1"; + protected static final String MULTI_TENANT_VIEW1 = "s1.multitenant_test_view1"; protected static final String MULTI_TENANT_VIEW1_INDEX = "idx_multitenant_test_view1"; - protected static final String MULTI_TENANT_VIEW2 = "multitenant_test_view2"; + protected static final String MULTI_TENANT_VIEW2 = "s2.multitenant_test_view2"; protected static final String MULTI_TENANT_VIEW2_INDEX = "idx_multitenant_test_view2"; protected void initMultiTenantTables(String index) throws SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c884798/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java index 88ebd52..1197193 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIndexIT.java @@ -247,18 +247,10 @@ public class CalciteIndexIT extends BaseCalciteIT { {"2", 3, 4, 5}}) .close(); - start(props).sql("select id, col0 from " + MULTI_TENANT_TABLE + " where col0 > 1") - .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + - " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_VIEW1]], filter=[>(CAST($0):INTEGER, 1)])\n") - .resultIs(new Object[][] { - {"2", 3}}) - .close(); - start(props).sql("select id, col0 from " + MULTI_TENANT_VIEW1 + " where col0 > 1") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + - " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_VIEW1]], filter=[>(CAST($0):INTEGER, 1)])\n") + " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[>(CAST($0):INTEGER, 1)])\n") .resultIs(new Object[][] { {"2", 3}}) .close(); @@ -274,7 +266,7 @@ public class CalciteIndexIT extends BaseCalciteIT { start(props).sql("select id, col0 from " + MULTI_TENANT_VIEW2 + " where col0 > 1") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + - " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_VIEW2]], filter=[>(CAST($0):INTEGER, 1)])\n") + " PhoenixTableScan(table=[[phoenix, S2, IDX_MULTITENANT_TEST_VIEW2]], filter=[>(CAST($0):INTEGER, 1)])\n") .resultIs(new Object[][] { {"5", 6}}) .close(); @@ -282,7 +274,7 @@ public class CalciteIndexIT extends BaseCalciteIT { start(props).sql("select id, col0 from " + MULTI_TENANT_VIEW2 + " order by col0") .explainIs("PhoenixToEnumerableConverter\n" + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + - " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_VIEW2]], scanOrder=[FORWARD])\n") + " PhoenixTableScan(table=[[phoenix, S2, IDX_MULTITENANT_TEST_VIEW2]], scanOrder=[FORWARD])\n") .resultIs(new Object[][] { {"5", 6}}) .close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c884798/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java b/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java new file mode 100644 index 0000000..e769b08 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java @@ -0,0 +1,244 @@ +package org.apache.calcite.jdbc; + +import java.io.InputStream; +import java.io.Reader; +import java.sql.NClob; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLXML; +import java.util.Properties; +import java.util.TimeZone; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaDatabaseMetaData; +import org.apache.calcite.avatica.AvaticaFactory; +import org.apache.calcite.avatica.AvaticaPreparedStatement; +import org.apache.calcite.avatica.AvaticaResultSetMetaData; +import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.Meta.Signature; +import org.apache.calcite.avatica.Meta.StatementHandle; +import org.apache.calcite.avatica.QueryState; +import org.apache.calcite.avatica.UnregisteredDriver; +import org.apache.calcite.jdbc.CalciteConnectionImpl; +import org.apache.calcite.jdbc.CalciteFactory; +import org.apache.calcite.jdbc.Driver; + +public class PhoenixCalciteFactory extends CalciteFactory { + + public PhoenixCalciteFactory() { + this(4, 1); + } + + protected PhoenixCalciteFactory(int major, int minor) { + super(major, minor); + } + + public AvaticaConnection newConnection(UnregisteredDriver driver, + AvaticaFactory factory, String url, Properties info, + CalciteSchema rootSchema, JavaTypeFactory typeFactory) { + return new PhoenixCalciteConnection( + (Driver) driver, factory, url, info, rootSchema, typeFactory); + } + + @Override + public AvaticaDatabaseMetaData newDatabaseMetaData( + AvaticaConnection connection) { + return new PhoenixCalciteDatabaseMetaData( + (PhoenixCalciteConnection) connection); + } + + @Override + public AvaticaStatement newStatement(AvaticaConnection connection, + StatementHandle h, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return new PhoenixCalciteStatement((PhoenixCalciteConnection) connection, + h, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @SuppressWarnings("rawtypes") + @Override + public AvaticaPreparedStatement newPreparedStatement( + AvaticaConnection connection, StatementHandle h, + Signature signature, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return new PhoenixCalcitePreparedStatement( + (PhoenixCalciteConnection) connection, h, + (CalcitePrepare.CalciteSignature) signature, + resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CalciteResultSet newResultSet(AvaticaStatement statement, QueryState state, + Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) { + final ResultSetMetaData metaData = + newResultSetMetaData(statement, signature); + @SuppressWarnings("rawtypes") + final CalcitePrepare.CalciteSignature calciteSignature = + (CalcitePrepare.CalciteSignature) signature; + return new CalciteResultSet(statement, calciteSignature, metaData, timeZone, + firstFrame); + } + + @Override + public ResultSetMetaData newResultSetMetaData(AvaticaStatement statement, + Meta.Signature signature) { + return new AvaticaResultSetMetaData(statement, null, signature); + } + + private static class PhoenixCalciteConnection extends CalciteConnectionImpl { + public PhoenixCalciteConnection(Driver driver, AvaticaFactory factory, String url, + Properties info, CalciteSchema rootSchema, + JavaTypeFactory typeFactory) { + super(driver, factory, url, info, + CalciteSchema.createRootSchema(true, false), typeFactory); + } + } + + private static class PhoenixCalciteStatement extends CalciteStatement { + public PhoenixCalciteStatement(PhoenixCalciteConnection connection, + Meta.StatementHandle h, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) { + super(connection, h, resultSetType, resultSetConcurrency, + resultSetHoldability); + } + } + + private static class PhoenixCalcitePreparedStatement extends CalcitePreparedStatement { + @SuppressWarnings("rawtypes") + PhoenixCalcitePreparedStatement(PhoenixCalciteConnection connection, + Meta.StatementHandle h, CalcitePrepare.CalciteSignature signature, + int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + super(connection, h, signature, resultSetType, resultSetConcurrency, + resultSetHoldability); + } + + public void setRowId( + int parameterIndex, + RowId x) throws SQLException { + getSite(parameterIndex).setRowId(x); + } + + public void setNString( + int parameterIndex, String value) throws SQLException { + getSite(parameterIndex).setNString(value); + } + + public void setNCharacterStream( + int parameterIndex, + Reader value, + long length) throws SQLException { + getSite(parameterIndex) + .setNCharacterStream(value, length); + } + + public void setNClob( + int parameterIndex, + NClob value) throws SQLException { + getSite(parameterIndex).setNClob(value); + } + + public void setClob( + int parameterIndex, + Reader reader, + long length) throws SQLException { + getSite(parameterIndex) + .setClob(reader, length); + } + + public void setBlob( + int parameterIndex, + InputStream inputStream, + long length) throws SQLException { + getSite(parameterIndex) + .setBlob(inputStream, length); + } + + public void setNClob( + int parameterIndex, + Reader reader, + long length) throws SQLException { + getSite(parameterIndex).setNClob(reader, length); + } + + public void setSQLXML( + int parameterIndex, SQLXML xmlObject) throws SQLException { + getSite(parameterIndex).setSQLXML(xmlObject); + } + + public void setAsciiStream( + int parameterIndex, + InputStream x, + long length) throws SQLException { + getSite(parameterIndex) + .setAsciiStream(x, length); + } + + public void setBinaryStream( + int parameterIndex, + InputStream x, + long length) throws SQLException { + getSite(parameterIndex) + .setBinaryStream(x, length); + } + + public void setCharacterStream( + int parameterIndex, + Reader reader, + long length) throws SQLException { + getSite(parameterIndex) + .setCharacterStream(reader, length); + } + + public void setAsciiStream( + int parameterIndex, InputStream x) throws SQLException { + getSite(parameterIndex).setAsciiStream(x); + } + + public void setBinaryStream( + int parameterIndex, InputStream x) throws SQLException { + getSite(parameterIndex).setBinaryStream(x); + } + + public void setCharacterStream( + int parameterIndex, Reader reader) throws SQLException { + getSite(parameterIndex) + .setCharacterStream(reader); + } + + public void setNCharacterStream( + int parameterIndex, Reader value) throws SQLException { + getSite(parameterIndex) + .setNCharacterStream(value); + } + + public void setClob( + int parameterIndex, + Reader reader) throws SQLException { + getSite(parameterIndex).setClob(reader); + } + + public void setBlob( + int parameterIndex, InputStream inputStream) throws SQLException { + getSite(parameterIndex) + .setBlob(inputStream); + } + + public void setNClob( + int parameterIndex, Reader reader) throws SQLException { + getSite(parameterIndex) + .setNClob(reader); + } + } + + /** Implementation of database metadata for JDBC 4.1. */ + private static class PhoenixCalciteDatabaseMetaData + extends AvaticaDatabaseMetaData { + PhoenixCalciteDatabaseMetaData(PhoenixCalciteConnection connection) { + super(connection); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c884798/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index bd98371..8b2c160 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@ -1,7 +1,6 @@ package org.apache.phoenix.calcite; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -20,16 +19,15 @@ import org.apache.phoenix.parse.TableName; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.LinkType; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SchemaUtil; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; @@ -56,147 +54,29 @@ public class PhoenixSchema implements Schema { protected final String name; protected final String schemaName; + protected final SchemaPlus parentSchema; protected final MetaDataClient client; - protected final Set<String> subSchemaNames; - protected final Map<String, PTable> tableMap; - protected final Map<String, ViewDef> viewDefMap; - protected final Map<String, Function> functionMap; - protected final Map<String, PhoenixSequence> sequenceMap; + protected final Map<String, Schema> subSchemas; + protected final Map<String, Table> tables; + protected final Map<String, Function> views; + protected final Set<PTable> viewTables; - private PhoenixSchema(String name, String schemaName, PhoenixConnection pc) { + protected PhoenixSchema(String name, String schemaName, + SchemaPlus parentSchema, PhoenixConnection pc) { this.name = name; this.schemaName = schemaName; + this.parentSchema = parentSchema; this.pc = pc; this.client = new MetaDataClient(pc); - this.tableMap = Maps.<String, PTable> newHashMap(); - this.viewDefMap = Maps.<String, ViewDef> newHashMap(); - this.functionMap = Maps.<String, Function> newHashMap(); - this.sequenceMap = Maps.<String, PhoenixSequence> newHashMap(); - loadTables(); - loadSequences(); - this.subSchemaNames = schemaName == null ? - ImmutableSet.<String> copyOf(loadSubSchemaNames()) - : Collections.<String> emptySet(); - } - - private Set<String> loadSubSchemaNames() { - try { - DatabaseMetaData md = pc.getMetaData(); - ResultSet rs = md.getSchemas(); - Set<String> subSchemaNames = Sets.newHashSet(); - while (rs.next()) { - String schemaName = rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM); - if (schemaName != null) { - subSchemaNames.add(schemaName); - } - } - // TODO FIXME: Remove this after PHOENIX-2489. - String tenantId = pc.getTenantId() == null ? null : pc.getTenantId().getString(); - String q = "select " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA - + " from " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED - + " where " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA - + " is not null" - + " and " + PhoenixDatabaseMetaData.TENANT_ID - + (tenantId == null ? " is null" : " = '" + tenantId + "'"); - rs = pc.createStatement().executeQuery(q); - while (rs.next()) { - String schemaName = rs.getString(1); - subSchemaNames.add(schemaName); - } - return subSchemaNames; - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - private void loadTables() { - try { - DatabaseMetaData md = pc.getMetaData(); - ResultSet rs = md.getTables(null, schemaName == null ? "" : schemaName, null, null); - while (rs.next()) { - String tableName = rs.getString(PhoenixDatabaseMetaData.TABLE_NAME); - String tableType = rs.getString(PhoenixDatabaseMetaData.TABLE_TYPE); - String viewType = rs.getString(PhoenixDatabaseMetaData.VIEW_TYPE); - if (!tableType.equals(PTableType.VIEW.getValue().getString()) - || ViewType.MAPPED.name().equals(viewType)) { - try { - ColumnResolver x = FromCompiler.getResolver( - NamedTableNode.create( - null, - TableName.create(schemaName, tableName), - ImmutableList.<ColumnDef>of()), pc); - final List<TableRef> tables = x.getTables(); - assert tables.size() == 1; - PTable pTable = tables.get(0).getTable(); - if (pc.getTenantId() == null && pTable.isMultiTenant()) { - pTable = fixTableMultiTenancy(pTable); - } - tableMap.put(tableName, pTable); - } catch (TableNotFoundException e) { - // Multi-tenant table with non-tenant-specific connection. - } - } else { - boolean isMultiTenant = rs.getBoolean(PhoenixDatabaseMetaData.MULTI_TENANT); - if (pc.getTenantId() != null || !isMultiTenant) { - String viewSql = rs.getString(PhoenixDatabaseMetaData.VIEW_STATEMENT); - if (viewSql == null) { - String q = "select " + PhoenixDatabaseMetaData.COLUMN_FAMILY - + " from " + PhoenixDatabaseMetaData.SYSTEM_CATALOG - + " where " + PhoenixDatabaseMetaData.TABLE_SCHEM - + (schemaName == null ? " is null" : " = '" + schemaName + "'") - + " and " + PhoenixDatabaseMetaData.TABLE_NAME - + " = '" + tableName + "'" - + " and " + PhoenixDatabaseMetaData.COLUMN_FAMILY - + " is not null" - + " and " + PhoenixDatabaseMetaData.LINK_TYPE - + " = " + LinkType.PHYSICAL_TABLE.getSerializedValue(); - ResultSet rs2 = pc.createStatement().executeQuery(q); - if (!rs2.next()) { - throw new SQLException("View link not found for " + tableName); - } - String parentTableName = rs2.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY); - viewSql = "select * from " + parentTableName; - } - viewDefMap.put(tableName, new ViewDef(viewSql, viewType.equals(ViewType.UPDATABLE.name()))); - } - } - } - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - private PTable fixTableMultiTenancy(PTable table) throws SQLException { - return PTableImpl.makePTable( - table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), - table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), PTableImpl.getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), - table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), - table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional()); - } - - private void loadSequences() { - try { - // TODO FIXME: Do this in loadTables() after PHOENIX-2489. - String tenantId = pc.getTenantId() == null ? null : pc.getTenantId().getString(); - String q = "select " + PhoenixDatabaseMetaData.SEQUENCE_NAME - + " from " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED - + " where " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA - + (schemaName == null ? " is null" : " = '" + schemaName + "'") - + " and " + PhoenixDatabaseMetaData.TENANT_ID - + (tenantId == null ? " is null" : " = '" + tenantId + "'"); - ResultSet rs = pc.createStatement().executeQuery(q); - while (rs.next()) { - String sequenceName = rs.getString(1); - sequenceMap.put(sequenceName, new PhoenixSequence(schemaName, sequenceName, pc)); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } + this.subSchemas = Maps.newHashMap(); + this.tables = Maps.newHashMap(); + this.views = Maps.newHashMap(); + this.viewTables = Sets.newHashSet(); } - private static Schema create(String name, Map<String, Object> operand) { + private static Schema create(SchemaPlus parentSchema, + String name, Map<String, Object> operand) { String url = (String) operand.get("url"); final Properties properties = new Properties(); for (Map.Entry<String, Object> entry : operand.entrySet()) { @@ -208,7 +88,7 @@ public class PhoenixSchema implements Schema { DriverManager.getConnection(url, properties); final PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class); - return new PhoenixSchema(name, null, phoenixConnection); + return new PhoenixSchema(name, null, parentSchema, phoenixConnection); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } catch (SQLException e) { @@ -218,41 +98,108 @@ public class PhoenixSchema implements Schema { @Override public Table getTable(String name) { - PTable table = tableMap.get(name); + Table table = tables.get(name); + if (table != null) { + return table; + } + + try { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create( + null, + TableName.create(schemaName, name), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + PTable pTable = tables.get(0).getTable(); + if (!isView(pTable)) { + pTable = fixTableMultiTenancy(pTable); + table = new PhoenixTable(pc, pTable); + } + } catch (TableNotFoundException e) { + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (table == null) { + table = resolveSequence(name); + } + if (table != null) { - return new PhoenixTable(pc, table); + tables.put(name, table); } - PhoenixSequence sequence = sequenceMap.get(name); - return sequence; + return table; } @Override public Set<String> getTableNames() { - return Sets.union(tableMap.keySet(), sequenceMap.keySet()); + return tables.keySet(); } @Override public Collection<Function> getFunctions(String name) { - Function func = functionMap.get(name); + Function func = views.get(name); + if (func != null) { + return ImmutableList.of(func); + } + + try { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create( + null, + TableName.create(schemaName, name), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + PTable pTable = tables.get(0).getTable(); + if (isView(pTable)) { + String viewSql = pTable.getViewStatement(); + if (viewSql == null) { + viewSql = "select * from " + + SchemaUtil.getEscapedFullTableName( + pTable.getPhysicalName().getString()); + } + SchemaPlus schema = parentSchema.getSubSchema(this.name); + SchemaPlus viewSqlSchema = + this.schemaName == null ? schema : parentSchema; + func = ViewTable.viewMacro(schema, viewSql, + CalciteSchema.from(viewSqlSchema).path(null), + pTable.getViewType() == ViewType.UPDATABLE); + views.put(name, func); + viewTables.add(pTable); + } + } catch (TableNotFoundException e) { + } catch (SQLException e) { + throw new RuntimeException(e); + } + return func == null ? Collections.<Function>emptyList() : ImmutableList.of(func); } @Override public Set<String> getFunctionNames() { - return viewDefMap.keySet(); + return views.keySet(); } @Override public Schema getSubSchema(String name) { - if (!subSchemaNames.contains(name)) + if (schemaName != null) { return null; + } + + Schema schema = subSchemas.get(name); + if (schema != null) { + return schema; + } - return new PhoenixSchema(name, name, pc); + schema = new PhoenixSchema(name, name, parentSchema.getSubSchema(this.name), pc); + subSchemas.put(name, schema); + return schema; } @Override public Set<String> getSubSchemaNames() { - return subSchemaNames; + return subSchemas.keySet(); } @Override @@ -270,49 +217,87 @@ public class PhoenixSchema implements Schema { return false; } - public void initFunctionMap(CalciteSchema calciteSchema) { - for (Map.Entry<String, ViewDef> entry : viewDefMap.entrySet()) { - ViewDef viewDef = entry.getValue(); - Function func = ViewTable.viewMacro( - calciteSchema.plus(), viewDef.viewSql, - calciteSchema.path(null), viewDef.updatable); - functionMap.put(entry.getKey(), func); - } - } - - public void defineIndexesAsMaterializations(CalciteSchema calciteSchema) { - List<String> path = calciteSchema.path(null); - for (PTable table : tableMap.values()) { - if (table.getType() == PTableType.INDEX) { - addMaterialization(table, path, calciteSchema); + public void defineIndexesAsMaterializations() { + SchemaPlus schema = parentSchema.getSubSchema(this.name); + SchemaPlus viewSqlSchema = + this.schemaName == null ? schema : parentSchema; + CalciteSchema calciteSchema = CalciteSchema.from(schema); + List<String> path = CalciteSchema.from(viewSqlSchema).path(null); + try { + for (Table table : tables.values()) { + if (table instanceof PhoenixTable) { + PTable pTable = ((PhoenixTable) table).pTable; + for (PTable index : pTable.getIndexes()) { + addMaterialization(index, path, calciteSchema); + } + } } + for (PTable pTable : viewTables) { + for (PTable index : pTable.getIndexes()) { + if (index.getParentName().equals(pTable.getName())) { + addMaterialization(index, path, calciteSchema); + } + } + } + } catch (SQLException e) { + throw new RuntimeException(e); } } - protected void addMaterialization(PTable index, List<String> path, - CalciteSchema calciteSchema) { + private void addMaterialization(PTable index, List<String> path, + CalciteSchema calciteSchema) throws SQLException { + index = fixTableMultiTenancy(index); StringBuffer sb = new StringBuffer(); sb.append("SELECT"); for (PColumn column : PhoenixTable.getMappedColumns(index)) { String indexColumnName = column.getName().getString(); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); - sb.append(",").append("\"").append(dataColumnName).append("\""); - sb.append(" ").append("\"").append(indexColumnName).append("\""); + sb.append(",").append(SchemaUtil.getEscapedFullColumnName(dataColumnName)); + sb.append(" ").append(SchemaUtil.getEscapedFullColumnName(indexColumnName)); } sb.setCharAt(6, ' '); // replace first comma with space. - sb.append(" FROM ").append("\"").append(index.getParentName().getString()).append("\""); + sb.append(" FROM ").append(SchemaUtil.getEscapedFullTableName(index.getParentName().getString())); MaterializationService.instance().defineMaterialization( calciteSchema, null, sb.toString(), path, index.getTableName().getString(), true, true); } - private static class ViewDef { - final String viewSql; - final boolean updatable; - - ViewDef(String viewSql, boolean updatable) { - this.viewSql = viewSql; - this.updatable = updatable; + private boolean isView(PTable table) { + return table.getType() == PTableType.VIEW + && table.getViewType() != ViewType.MAPPED; + } + + private PTable fixTableMultiTenancy(PTable table) throws SQLException { + if (pc.getTenantId() != null || !table.isMultiTenant()) { + return table; } + return PTableImpl.makePTable( + table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), + table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), PTableImpl.getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), + table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), + table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), + table.getTableStats(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional()); + } + + private PhoenixSequence resolveSequence(String name) { + try { + // FIXME: Do this the same way as resolving a table after PHOENIX-2489. + String tenantId = pc.getTenantId() == null ? null : pc.getTenantId().getString(); + String q = "select 1 from " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED + + " where " + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + + (schemaName == null ? " is null" : " = '" + schemaName + "'") + + " and " + PhoenixDatabaseMetaData.SEQUENCE_NAME + + " = '" + name + "'" + + " and " + PhoenixDatabaseMetaData.TENANT_ID + + (tenantId == null ? " is null" : " = '" + tenantId + "'"); + ResultSet rs = pc.createStatement().executeQuery(q); + if (rs.next()) { + return new PhoenixSequence(schemaName, name, pc); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return null; } /** Schema factory that creates a @@ -340,7 +325,7 @@ public class PhoenixSchema implements Schema { */ public static class Factory implements SchemaFactory { public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { - return PhoenixSchema.create(name, operand); + return PhoenixSchema.create(parentSchema, name, operand); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c884798/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixCalciteDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixCalciteDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixCalciteDriver.java index e219639..a103b30 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixCalciteDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixCalciteDriver.java @@ -41,7 +41,19 @@ public class PhoenixCalciteDriver extends Driver { @Override protected String getConnectStringPrefix() { return CONNECT_STRING_PREFIX; - } + } + + @Override protected String getFactoryClassName(JdbcVersion jdbcVersion) { + switch (jdbcVersion) { + case JDBC_30: + case JDBC_40: + throw new IllegalArgumentException("JDBC version not supported: " + + jdbcVersion); + case JDBC_41: + default: + return "org.apache.calcite.jdbc.PhoenixCalciteFactory"; + } + } public Connection connect(String url, Properties info) throws SQLException { if (!acceptsURL(url)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c884798/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java index a956eee..afece22 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/jdbc/PhoenixPrepareImpl.java @@ -103,22 +103,27 @@ public class PhoenixPrepareImpl extends CalcitePrepareImpl { planner.addRule(new PhoenixForwardTableScanRule(PhoenixTemporarySort.class)); planner.addRule(new PhoenixReverseTableScanRule(LogicalSort.class)); planner.addRule(new PhoenixReverseTableScanRule(PhoenixTemporarySort.class)); - - for (CalciteSchema schema : prepareContext.getRootSchema().getSubSchemaMap().values()) { - if (schema.schema instanceof PhoenixSchema) { - PhoenixSchema phoenixSchema = (PhoenixSchema) schema.schema; - phoenixSchema.initFunctionMap(schema); - if (prepareContext.config().materializationsEnabled()) { - phoenixSchema.defineIndexesAsMaterializations(schema); - } - for (CalciteSchema subSchema : schema.getSubSchemaMap().values()) { - PhoenixSchema phoenixSubSchema = (PhoenixSchema) subSchema.schema; - phoenixSubSchema.initFunctionMap(subSchema); - if (prepareContext.config().materializationsEnabled()) { - phoenixSubSchema.defineIndexesAsMaterializations(subSchema); + + if (prepareContext.config().materializationsEnabled()) { + final CalciteSchema rootSchema = prepareContext.getRootSchema(); + Hook.TRIMMED.add(new Function<RelNode, Object>() { + boolean called = false; + @Override + public Object apply(RelNode root) { + if (!called) { + called = true; + for (CalciteSchema schema : rootSchema.getSubSchemaMap().values()) { + if (schema.schema instanceof PhoenixSchema) { + ((PhoenixSchema) schema.schema).defineIndexesAsMaterializations(); + for (CalciteSchema subSchema : schema.getSubSchemaMap().values()) { + ((PhoenixSchema) subSchema.schema).defineIndexesAsMaterializations(); + } + } + } } - } - } + return null; + } + }); } Hook.PROGRAM.add(new Function<Pair<List<Materialization>, Holder<Program>>, Object>() {