http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java index 15452b7,0000000..1324717 mode 100644,000000..100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteLocalIndexIT.java @@@ -1,280 -1,0 +1,280 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.calcite; + +import static org.junit.Assert.fail; + +import java.sql.SQLException; +import java.util.Properties; + +import org.junit.Test; + +public class CalciteLocalIndexIT extends BaseCalciteIndexIT { + + public CalciteLocalIndexIT() { + super(true); + } + + @Test public void testIndex() throws Exception { + final Start start1000 = start(true, 1000f); + final Start start1000000 = start(true, 1000000f); + + start1000.sql("select * from aTable where b_string = 'b'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ORGANIZATION_ID=[$1], ENTITY_ID=[$2], A_STRING=[$3], B_STRING=[$0], A_INTEGER=[$4], A_DATE=[$5], A_TIME=[$6], A_TIMESTAMP=[$7], X_DECIMAL=[$8], X_LONG=[$9], X_INTEGER=[$10], Y_INTEGER=[$11], A_BYTE=[$12], A_SHORT=[$13], A_FLOAT=[$14], A_DOUBLE=[$15], A_UNSIGNED_FLOAT=[$16], A_UNSIGNED_DOUBLE=[$17])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_FULL]], filter=[=($0, 'b')])\n"); + start1000.sql("select x_integer from aTable") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(X_INTEGER=[$10])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n"); + /*.explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(X_INTEGER=[$4])\n" + + " PhoenixTableScan(table=[[phoenix, IDX1]])\n")*/ + start1000.sql("select a_string from aTable order by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(0:A_STRING=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, IDX1]], scanOrder=[FORWARD])\n"); + start1000000.sql("select a_string from aTable order by organization_id") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(A_STRING=[$2], ORGANIZATION_ID=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]], scanOrder=[FORWARD])\n"); + start1000.sql("select a_integer from aTable order by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerSort(sort0=[$1], dir0=[ASC])\n" + + " PhoenixServerProject(A_INTEGER=[$4], A_STRING=[$2])\n" + + " PhoenixTableScan(table=[[phoenix, ATABLE]])\n"); + /*.explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerSort(sort0=[$1], dir0=[ASC])\n" + + " PhoenixServerProject(A_INTEGER=[$4], A_STRING=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_FULL]])\n")*/ + start1000.sql("select a_string, b_string from aTable where a_string = 'a'") + .explainMatches("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject\\((0:)?A_STRING=\\[\\$0\\], (0:)?B_STRING=\\[\\$3\\]\\)\n" + + " PhoenixTableScan\\(table=\\[\\[phoenix, IDX1\\]\\], filter=\\[=\\(\\$0, 'a'\\)\\]\\)\n"); + start1000.sql("select a_string, b_string from aTable where b_string = 'b'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(A_STRING=[$3], B_STRING=[$0])\n" + + " PhoenixTableScan(table=[[phoenix, IDX2]], filter=[=($0, 'b')])\n"); + start1000.sql("select a_string, b_string, x_integer, y_integer from aTable where b_string = 'b'") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(A_STRING=[$3], B_STRING=[$0], X_INTEGER=[$10], Y_INTEGER=[$11])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_FULL]], filter=[=($0, 'b')])\n"); + start1000.sql("select a_string, count(*) from aTable group by a_string") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{0}], EXPR$1=[COUNT()], isOrdered=[true])\n" + + " PhoenixTableScan(table=[[phoenix, IDX1]], scanOrder=[FORWARD])\n"); + + start1000.close(); + start1000000.close(); + } + + @Test public void testSaltedIndex() throws Exception { + final Start start1 = start(true, 1f); + start1.sql("select count(*) from " + NOSALT_TABLE_NAME + " where col0 > 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n") + .resultIs(0, new Object[][]{{999L}}); + start1.sql("select mypk0, mypk1, col0 from " + NOSALT_TABLE_NAME + " where col0 <= 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_NOSALT_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n") + .resultIs(0, new Object[][] { + {2, 3, 4}, + {1, 2, 3}}); + start1.sql("select * from " + SALTED_TABLE_NAME + " where mypk0 < 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[<($0, 3)])\n") + .resultIs(0, new Object[][] { + {1, 2, 3, 4}, + {2, 3, 4, 5}}); + start1.sql("select count(*) from " + SALTED_TABLE_NAME + " where col0 > 3") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 3)])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][]{{999L}})*/; + start1.sql("select mypk0, mypk1, col0 from " + SALTED_TABLE_NAME + " where col0 <= 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 4)])\n") + .resultIs(0, new Object[][] { + {2, 3, 4}, + {1, 2, 3}}); + start1.sql("select count(*) from " + SALTED_TABLE_NAME + " where col1 > 4") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerAggregate(group=[{}], EXPR$0=[COUNT()])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 4)])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][]{{999L}})*/; + start1.sql("select * from " + SALTED_TABLE_NAME + " where col1 <= 5 order by col1") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 5)], scanOrder=[FORWARD])\n") + .resultIs(new Object[][] { + {1, 2, 3, 4}, + {2, 3, 4, 5}}); + start1.sql("select * from " + SALTED_TABLE_NAME + " s1, " + SALTED_TABLE_NAME + " s2 where s1.mypk1 = s2.mypk1 and s1.mypk0 > 500 and s2.col1 < 505") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerJoin(condition=[=($1, $5)], joinType=[inner])\n" + + " PhoenixTableScan(table=[[phoenix, SALTED_TEST_TABLE]], filter=[>($0, 500)])\n" + + " PhoenixServerProject(MYPK0=[$1], MYPK1=[$2], COL0=[$3], COL1=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, IDXSALTED_SALTED_TEST_TABLE]], filter=[<(CAST($0):INTEGER, 505)])\n") + .resultIs(0, new Object[][] { + {501, 502, 503, 504, 501, 502, 503, 504}}); + start1.close(); + } + + @Test public void testMultiTenant() throws Exception { + Properties props = getConnectionProps(true, 1f); + final Start start = start(props); + props = getConnectionProps(true, 1f); + props.setProperty("TenantId", "15"); + final Start startTenant15 = start(props); + props = getConnectionProps(true, 1f); + props.setProperty("TenantId", "10"); + final Start startTenant10 = start(props); + props = getConnectionProps(true, 1f); + props.setProperty("TenantId", "20"); + final Start startTenant20 = start(props); + + start.sql("select * from " + MULTI_TENANT_TABLE + " where tenant_id = '10' and id <= '0004'") + .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[AND(=(CAST($0):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '10'), <=($1, '0004'))])\n") ++ " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[AND(=($0, CAST('10'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL), <=($1, '0004'))])\n") + .resultIs(0, new Object[][] { + {"10", "0002", 3, 4, 5}, + {"10", "0003", 4, 5, 6}, + {"10", "0004", 5, 6, 7}}); + + start.sql("select * from " + MULTI_TENANT_TABLE + " where tenant_id = '20' and col1 < 8") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(TENANT_ID=[$0], ID=[$2], COL0=[$3], COL1=[CAST($1):INTEGER], COL2=[$4])\n" + - " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[AND(=(CAST($0):VARCHAR(2) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '20'), <(CAST($1):INTEGER, 8))])\n"); - // .resultIs(0, new Object[][] { - // {"20", "0004", 5, 6, 7}, - // {"20", "0005", 6, 7, 8}}); ++ " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[AND(=($0, CAST('20'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL), <(CAST($1):INTEGER, 8))])\n") ++ /*.resultIs(0, new Object[][] { ++ {"20", "0004", 5, 6, 7}, ++ {"20", "0005", 6, 7, 8}})*/; + + try { + start.sql("select * from " + MULTI_TENANT_VIEW1) + .explainIs(""); + fail("Should have got SQLException."); + } catch (SQLException e) { + } + + startTenant15.sql("select * from " + MULTI_TENANT_TABLE + " where id = '0284'") + .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[=(CAST($0):VARCHAR(4) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '0284')])\n") ++ " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[=($0, CAST('0284'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL)])\n") + .resultIs(0, new Object[][] { + {"0284", 285, 286, 287}}); + + startTenant15.sql("select * from " + MULTI_TENANT_TABLE + " where col1 > 1000") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[$2], COL1=[CAST($0):INTEGER], COL2=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[>(CAST($0):INTEGER, 1000)])\n") + .resultIs(0, new Object[][] { + {"0999", 1000, 1001, 1002}, + {"1000", 1001, 1002, 1003}, + {"1001", 1002, 1003, 1004}, + {"1002", 1003, 1004, 1005}}); + + try { + startTenant15.sql("select * from " + MULTI_TENANT_VIEW1) + .explainIs(""); + fail("Should have got SQLException."); + } catch (SQLException e) { + } + + startTenant10.sql("select * from " + MULTI_TENANT_VIEW1 + " where id = '0512'") + .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[=(CAST($0):VARCHAR(4) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '0512')])\n") ++ " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[=($0, CAST('0512'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL)])\n") + .resultIs(0, new Object[][] { + {"0512", 513, 514, 515}}); + + startTenant10.sql("select * from " + MULTI_TENANT_TABLE + " where col1 <= 6") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[$2], COL1=[CAST($0):INTEGER], COL2=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, IDX_MULTITENANT_TEST_TABLE]], filter=[<=(CAST($0):INTEGER, 6)])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][] { + {"0002", 3, 4, 5}, + {"0003", 4, 5, 6}, + {"0004", 5, 6, 7}})*/; + + startTenant10.sql("select id, col0 from " + MULTI_TENANT_VIEW1 + " where col0 >= 1000") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[>=(CAST($0):INTEGER, 1000)])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][] { + {"0999", 1000}, + {"1000", 1001}, + {"1001", 1002}})*/; + + startTenant10.sql("select * from " + MULTI_TENANT_VIEW1 + " where col0 = 1000") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER], COL1=[$2], COL2=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[=(CAST($0):INTEGER, 1000)], extendedColumns=[{2, 3}])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][] { + {"0999", 1000, 1001, 1002}})*/; + + startTenant10.sql("select id, col0, col2 from " + MULTI_TENANT_VIEW1 + " where col0 = 1000") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER], COL2=[$3])\n" + + " PhoenixTableScan(table=[[phoenix, S1, IDX_MULTITENANT_TEST_VIEW1]], filter=[=(CAST($0):INTEGER, 1000)], extendedColumns=[{3}])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][] { + {"0999", 1000, 1002}})*/; + + startTenant20.sql("select * from " + MULTI_TENANT_VIEW2 + " where id = '0765'") + .explainIs("PhoenixToEnumerableConverter\n" + - " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[AND(>($3, 7), =(CAST($0):VARCHAR(4) CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL, '0765'))])\n") ++ " PhoenixTableScan(table=[[phoenix, MULTITENANT_TEST_TABLE]], filter=[AND(>($3, 7), =($0, CAST('0765'):VARCHAR CHARACTER SET \"ISO-8859-1\" COLLATE \"ISO-8859-1$en_US$primary\" NOT NULL))])\n") + .resultIs(0, new Object[][] { + {"0765", 766, 767, 768}}); + + startTenant20.sql("select id, col0 from " + MULTI_TENANT_VIEW2 + " where col0 between 272 and 275") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, S2, IDX_MULTITENANT_TEST_VIEW2]], filter=[AND(>=(CAST($0):INTEGER, 272), <=(CAST($0):INTEGER, 275))])\n") + .sameResultAsPhoenixStandalone(0) + /*.resultIs(0, new Object[][] { + {"0271", 272}, + {"0272", 273}, + {"0273", 274}, + {"0274", 275}})*/; + + startTenant20.sql("select id, col0 from " + MULTI_TENANT_VIEW2 + " order by col0 limit 5") + .explainIs("PhoenixToEnumerableConverter\n" + + " PhoenixLimit(fetch=[5])\n" + + " PhoenixServerProject(ID=[$1], COL0=[CAST($0):INTEGER])\n" + + " PhoenixTableScan(table=[[phoenix, S2, IDX_MULTITENANT_TEST_VIEW2]], scanOrder=[FORWARD])\n") + .sameResultAsPhoenixStandalone() + /*.resultIs(new Object[][] { + {"0005", 6}, + {"0006", 7}, + {"0007", 8}, + {"0008", 9}, + {"0009", 10}})*/; + + start.close(); + startTenant15.close(); + startTenant10.close(); + startTenant20.close(); + } + +}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java index 1b3731c,152bdf0..4a779da --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseJoinIT.java @@@ -60,14 -60,14 +60,14 @@@ public abstract class BaseJoinIT extend " \"item_id\" varchar(10), " + " price integer, " + " quantity integer, " + - " \"DATE\" timestamp)"); - " date timestamp) IMMUTABLE_ROWS=true"); ++ " \"DATE\" timestamp) IMMUTABLE_ROWS=true"); builder.put(JOIN_CUSTOMER_TABLE_FULL_NAME, "create table " + JOIN_CUSTOMER_TABLE_FULL_NAME + " (\"customer_id\" varchar(10) not null primary key, " + " name varchar, " + " phone varchar(12), " + " address varchar, " + " loc_id varchar(5), " + - " \"DATE\" date)"); - " date date) IMMUTABLE_ROWS=true"); ++ " \"DATE\" date) IMMUTABLE_ROWS=true"); builder.put(JOIN_ITEM_TABLE_FULL_NAME, "create table " + JOIN_ITEM_TABLE_FULL_NAME + " (\"item_id\" varchar(10) not null primary key, " + " name varchar, " + http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java index 5e04281,0000000..f1d8048 mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java +++ b/phoenix-core/src/main/java/org/apache/calcite/jdbc/PhoenixCalciteFactory.java @@@ -1,738 -1,0 +1,710 @@@ +package org.apache.calcite.jdbc; + +import java.io.File; +import java.io.InputStream; +import java.io.Reader; +import java.sql.DatabaseMetaData; +import java.sql.Date; +import java.sql.NClob; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Time; +import java.sql.Timestamp; +import java.sql.ResultSet; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import org.apache.calcite.adapter.java.JavaTypeFactory; +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaDatabaseMetaData; +import org.apache.calcite.avatica.AvaticaFactory; +import org.apache.calcite.avatica.AvaticaPreparedStatement; +import org.apache.calcite.avatica.AvaticaResultSetMetaData; +import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.Meta.Signature; +import org.apache.calcite.avatica.Meta.StatementHandle; +import org.apache.calcite.avatica.remote.AvaticaHttpClientFactory; +import org.apache.calcite.avatica.remote.Service.Factory; +import org.apache.calcite.avatica.remote.TypedValue; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.avatica.util.Quoting; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.config.Lex; +import org.apache.calcite.config.NullCollation; +import org.apache.calcite.avatica.QueryState; +import org.apache.calcite.avatica.UnregisteredDriver; +import org.apache.calcite.jdbc.CalciteConnectionImpl; +import org.apache.calcite.jdbc.CalciteFactory; +import org.apache.calcite.jdbc.Driver; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.model.JsonSchema.Type; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.phoenix.calcite.CalciteUtils; +import org.apache.phoenix.calcite.PhoenixSchema; +import org.apache.phoenix.calcite.PhoenixSqlConformance; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.RuntimeContext; +import org.apache.phoenix.jdbc.PhoenixConnection; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +public class PhoenixCalciteFactory extends CalciteFactory { + + public PhoenixCalciteFactory() { + this(4, 1); + } + + protected PhoenixCalciteFactory(int major, int minor) { + super(major, minor); + } + + public AvaticaConnection newConnection(UnregisteredDriver driver, + AvaticaFactory factory, String url, Properties info, + CalciteSchema rootSchema, JavaTypeFactory typeFactory) { + return new PhoenixCalciteConnection( + (Driver) driver, factory, url, info, + CalciteSchema.createRootSchema(true, false), typeFactory); + } + + @Override + public AvaticaDatabaseMetaData newDatabaseMetaData( + AvaticaConnection connection) { + return new PhoenixCalciteDatabaseMetaData( + (PhoenixCalciteConnection) connection); + } + + @Override + public AvaticaStatement newStatement(AvaticaConnection connection, + StatementHandle h, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return new PhoenixCalciteStatement((PhoenixCalciteConnection) connection, + h, resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @SuppressWarnings("rawtypes") + @Override + public AvaticaPreparedStatement newPreparedStatement( + AvaticaConnection connection, StatementHandle h, + Signature signature, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) throws SQLException { + return new PhoenixCalcitePreparedStatement( + (PhoenixCalciteConnection) connection, h, + (CalcitePrepare.CalciteSignature) signature, + resultSetType, resultSetConcurrency, resultSetHoldability); + } + + @Override + public CalciteResultSet newResultSet(AvaticaStatement statement, QueryState state, + Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) { + final ResultSetMetaData metaData = + newResultSetMetaData(statement, signature); + @SuppressWarnings("rawtypes") + final CalcitePrepare.CalciteSignature calciteSignature = + (CalcitePrepare.CalciteSignature) signature; + return new CalciteResultSet(statement, calciteSignature, metaData, timeZone, + firstFrame); + } + + @Override + public ResultSetMetaData newResultSetMetaData(AvaticaStatement statement, + Meta.Signature signature) { + return new AvaticaResultSetMetaData(statement, null, signature); + } + + private static class PhoenixCalciteConnection extends CalciteConnectionImpl { + private final Map<Meta.StatementHandle, ImmutableList<RuntimeContext>> runtimeContextMap = + new ConcurrentHashMap<Meta.StatementHandle, ImmutableList<RuntimeContext>>(); + + public PhoenixCalciteConnection(Driver driver, AvaticaFactory factory, String url, + Properties info, final CalciteSchema rootSchema, + JavaTypeFactory typeFactory) { + super(driver, factory, url, info, rootSchema, typeFactory); + } + + @Override + public CalciteConnectionConfig config() { + final CalciteConnectionConfig config = super.config(); + return new DelegateCalciteConnectionConfig(config) { + @Override + public SqlConformance conformance() { + return PhoenixSqlConformance.INSTANCE; + } + }; + } + + @Override + public CalciteStatement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + try { + return super.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); + } catch (SQLException e) { + throw CalciteUtils.unwrapSqlException(e); + } + } + + @Override + public CalcitePreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + try { + return super.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); + } catch (SQLException e) { + throw CalciteUtils.unwrapSqlException(e); + } + } + + public <T> Enumerable<T> enumerable(Meta.StatementHandle handle, + CalcitePrepare.CalciteSignature<T> signature) throws SQLException { + Map<String, Object> map = Maps.newLinkedHashMap(); + AvaticaStatement statement = lookupStatement(handle); + final List<TypedValue> parameterValues = + TROJAN.getParameterValues(statement); + final Calendar calendar = Calendar.getInstance(); + for (Ord<TypedValue> o : Ord.zip(parameterValues)) { + map.put("?" + o.i, o.e.toJdbc(calendar)); + } + ImmutableList<RuntimeContext> ctxList = runtimeContextMap.get(handle); + if (ctxList == null) { + List<RuntimeContext> activeCtx = RuntimeContext.THREAD_LOCAL.get(); + ctxList = ImmutableList.copyOf(activeCtx); + runtimeContextMap.put(handle, ctxList); + activeCtx.clear(); + } + for (RuntimeContext runtimeContext : ctxList) { + runtimeContext.setBindParameterValues(map); + } + return super.enumerable(handle, signature); + } + + @Override + public void abort(final Executor executor) throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.abort(executor); + }}); + } + + @Override + public void rollback() throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.rollback(); + }}); + } + + @Override + public void setReadOnly(final boolean readOnly) throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.setReadOnly(readOnly); + }}); + super.setReadOnly(readOnly); + } + + @Override + public void setTransactionIsolation(final int level) throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.setTransactionIsolation(level); + }}); + super.setTransactionIsolation(level); + } + + @Override + public void clearWarnings() throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.clearWarnings(); + }}); + super.clearWarnings(); + } + + @Override + public Savepoint setSavepoint() throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void rollback(final Savepoint savepoint) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + throw new SQLFeatureNotSupportedException(); + } + + public void setAutoCommit(final boolean isAutoCommit) throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.setAutoCommit(isAutoCommit);; + }}); + } + + public void commit() throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.commit(); + }}); + } + + public void close() throws SQLException { + call(new PhoenixConnectionCallable() { + @Override + public void call(PhoenixConnection conn) throws SQLException { + conn.close(); + }}); + super.close(); + } + + private void call(PhoenixConnectionCallable callable) throws SQLException { + for (String subSchemaName : getRootSchema().getSubSchemaNames()) { + try { + PhoenixSchema phoenixSchema = getRootSchema() + .getSubSchema(subSchemaName).unwrap(PhoenixSchema.class); + callable.call(phoenixSchema.pc); + } catch (ClassCastException e) { + } + } + } + + private static interface PhoenixConnectionCallable { + void call(PhoenixConnection conn) throws SQLException; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + PhoenixConnection pc = getPhoenixConnection(getRootSchema()); + if(pc != null) { + return pc.getMetaData(); + } + return super.getMetaData(); + } + + @Override + public Properties getClientInfo() throws SQLException { + PhoenixConnection pc = getPhoenixConnection(getRootSchema()); + if(pc != null) { + return pc.getClientInfo(); + } + return super.getClientInfo(); + } + + @Override + public String getClientInfo(String name) throws SQLException { + PhoenixConnection pc = getPhoenixConnection(getRootSchema()); + if(pc != null) { + return pc.getClientInfo(name); + } + return super.getClientInfo(name); + } + + private PhoenixConnection getPhoenixConnection(SchemaPlus rootSchema) { + for (String subSchemaName : getRootSchema().getSubSchemaNames()) { + try { + PhoenixSchema phoenixSchema = + getRootSchema().getSubSchema(subSchemaName).unwrap(PhoenixSchema.class); + return phoenixSchema.pc; + } catch (ClassCastException e) { + } + } + return null; + } + @SuppressWarnings("unchecked") + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + if (iface.isInstance(this)) { + return (T) this; + } + + if (iface.isAssignableFrom(PhoenixConnection.class)) { + SchemaPlus schema = getRootSchema().getSubSchema(this.getSchema()); + try { + return (T) (schema.unwrap(PhoenixSchema.class).pc); + } catch (ClassCastException e) { + } + } + + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE) + .setMessage(this.getClass().getName() + " not unwrappable from " + iface.getName()) + .build().buildException(); + } + } + + private static class PhoenixCalciteStatement extends CalciteStatement { + public PhoenixCalciteStatement(PhoenixCalciteConnection connection, + Meta.StatementHandle h, int resultSetType, int resultSetConcurrency, + int resultSetHoldability) { + super(connection, h, resultSetType, resultSetConcurrency, + resultSetHoldability); + } + + @Override + public boolean execute(String sql) throws SQLException { + try { + return super.execute(sql); + } catch (SQLException e) { + throw CalciteUtils.unwrapSqlException(e); + } + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException{ + try { + return super.executeQuery(sql); + } catch (SQLException e) { + throw CalciteUtils.unwrapSqlException(e); + } + } + } + + private static class PhoenixCalcitePreparedStatement extends CalcitePreparedStatement { + @SuppressWarnings("rawtypes") + PhoenixCalcitePreparedStatement(PhoenixCalciteConnection connection, + Meta.StatementHandle h, CalcitePrepare.CalciteSignature signature, + int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + super(connection, h, signature, resultSetType, resultSetConcurrency, + resultSetHoldability); + } + + @Override + public boolean execute(String sql) throws SQLException { + try { + return super.execute(sql); + } catch (SQLException e) { + throw CalciteUtils.unwrapSqlException(e); + } + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException{ + try { + return super.executeQuery(sql); + } catch (SQLException e) { + throw CalciteUtils.unwrapSqlException(e); + } + } + - public void setTimestamp(int parameterIndex, Timestamp x, Calendar calendar) - throws SQLException { - if (x != null) { - x = new Timestamp(getAdjustedTime(x.getTime(), calendar)); - } - super.setTimestamp(parameterIndex, x, calendar); - } - - public void setDate(int parameterIndex, Date x, Calendar calendar) - throws SQLException { - if (x != null) { - x = new Date(getAdjustedTime(x.getTime(), calendar)); - } - super.setDate(parameterIndex, x, calendar); - } - - public void setTime(int parameterIndex, Time x, Calendar calendar) - throws SQLException { - if (x != null) { - x = new Time(getAdjustedTime(x.getTime(), calendar)); - } - super.setTime(parameterIndex, x, calendar); - } - - private long getAdjustedTime(long v, Calendar calendar) { - return (v - calendar.getTimeZone().getOffset(v)); - } - + public void setRowId( + int parameterIndex, + RowId x) throws SQLException { + getSite(parameterIndex).setRowId(x); + } + + public void setNString( + int parameterIndex, String value) throws SQLException { + getSite(parameterIndex).setNString(value); + } + + public void setNCharacterStream( + int parameterIndex, + Reader value, + long length) throws SQLException { + getSite(parameterIndex) + .setNCharacterStream(value, length); + } + + public void setNClob( + int parameterIndex, + NClob value) throws SQLException { + getSite(parameterIndex).setNClob(value); + } + + public void setClob( + int parameterIndex, + Reader reader, + long length) throws SQLException { + getSite(parameterIndex) + .setClob(reader, length); + } + + public void setBlob( + int parameterIndex, + InputStream inputStream, + long length) throws SQLException { + getSite(parameterIndex) + .setBlob(inputStream, length); + } + + public void setNClob( + int parameterIndex, + Reader reader, + long length) throws SQLException { + getSite(parameterIndex).setNClob(reader, length); + } + + public void setSQLXML( + int parameterIndex, SQLXML xmlObject) throws SQLException { + getSite(parameterIndex).setSQLXML(xmlObject); + } + + public void setAsciiStream( + int parameterIndex, + InputStream x, + long length) throws SQLException { + getSite(parameterIndex) + .setAsciiStream(x, length); + } + + public void setBinaryStream( + int parameterIndex, + InputStream x, + long length) throws SQLException { + getSite(parameterIndex) + .setBinaryStream(x, length); + } + + public void setCharacterStream( + int parameterIndex, + Reader reader, + long length) throws SQLException { + getSite(parameterIndex) + .setCharacterStream(reader, length); + } + + public void setAsciiStream( + int parameterIndex, InputStream x) throws SQLException { + getSite(parameterIndex).setAsciiStream(x); + } + + public void setBinaryStream( + int parameterIndex, InputStream x) throws SQLException { + getSite(parameterIndex).setBinaryStream(x); + } + + public void setCharacterStream( + int parameterIndex, Reader reader) throws SQLException { + getSite(parameterIndex) + .setCharacterStream(reader); + } + + public void setNCharacterStream( + int parameterIndex, Reader value) throws SQLException { + getSite(parameterIndex) + .setNCharacterStream(value); + } + + public void setClob( + int parameterIndex, + Reader reader) throws SQLException { + getSite(parameterIndex).setClob(reader); + } + + public void setBlob( + int parameterIndex, InputStream inputStream) throws SQLException { + getSite(parameterIndex) + .setBlob(inputStream); + } + + public void setNClob( + int parameterIndex, Reader reader) throws SQLException { + getSite(parameterIndex) + .setNClob(reader); + } + } + + /** Implementation of database metadata for JDBC 4.1. */ + private static class PhoenixCalciteDatabaseMetaData + extends AvaticaDatabaseMetaData { + PhoenixCalciteDatabaseMetaData(PhoenixCalciteConnection connection) { + super(connection); + } + } + + private static class DelegateCalciteConnectionConfig implements CalciteConnectionConfig { + private final CalciteConnectionConfig delegate; + + DelegateCalciteConnectionConfig(CalciteConnectionConfig delegate) { + this.delegate = delegate; + } + + @Override + public String authentication() { + return delegate.authentication(); + } + + @Override + public String avaticaPassword() { + return delegate.avaticaPassword(); + } + + @Override + public String avaticaUser() { + return delegate.avaticaUser(); + } + + @Override + public Factory factory() { + return delegate.factory(); + } + + @Override + public String httpClientClass() { + return delegate.httpClientClass(); + } + + @Override + public AvaticaHttpClientFactory httpClientFactory() { + return delegate.httpClientFactory(); + } + + @Override + public File kerberosKeytab() { + return delegate.kerberosKeytab(); + } + + @Override + public String kerberosPrincipal() { + return delegate.kerberosPrincipal(); + } + + @Override + public String schema() { + return delegate.schema(); + } + + @Override + public String serialization() { + return delegate.serialization(); + } + + @Override + public String timeZone() { + return delegate.timeZone(); + } + + @Override + public String url() { + return delegate.url(); + } + + @Override + public boolean autoTemp() { + return delegate.autoTemp(); + } + + @Override + public boolean materializationsEnabled() { + return delegate.materializationsEnabled(); + } + + @Override + public boolean createMaterializations() { + return delegate.createMaterializations(); + } + + @Override + public NullCollation defaultNullCollation() { + return delegate.defaultNullCollation(); + } + + @Override + public <T> T fun(Class<T> operatorTableClass, T defaultOperatorTable) { + return delegate.fun(operatorTableClass, defaultOperatorTable); + } + + @Override + public String model() { + return delegate.model(); + } + + @Override + public Lex lex() { + return delegate.lex(); + } + + @Override + public Quoting quoting() { + return delegate.quoting(); + } + + @Override + public Casing unquotedCasing() { + return delegate.unquotedCasing(); + } + + @Override + public Casing quotedCasing() { + return delegate.quotedCasing(); + } + + @Override + public boolean caseSensitive() { + return delegate.caseSensitive(); + } + + @Override + public <T> T parserFactory(Class<T> parserFactoryClass, T defaultParserFactory) { + return delegate.parserFactory(parserFactoryClass, defaultParserFactory); + } + + @Override + public <T> T schemaFactory(Class<T> schemaFactoryClass, T defaultSchemaFactory) { + return delegate.schemaFactory(schemaFactoryClass, defaultSchemaFactory); + } + + @Override + public Type schemaType() { + return delegate.schemaType(); + } + + @Override + public boolean spark() { + return delegate.spark(); + } + + @Override + public boolean forceDecorrelate() { + return delegate.forceDecorrelate(); + } + + @Override + public <T> T typeSystem(Class<T> typeSystemClass, T defaultTypeSystem) { + return delegate.typeSystem(typeSystemClass, defaultTypeSystem); + } + + @Override + public SqlConformance conformance() { + return delegate.conformance(); + } + + @Override + public boolean approximateDistinctCount() { + return delegate.approximateDistinctCount(); + } + + @Override + public boolean approximateTopN() { + return delegate.approximateTopN(); + } + + @Override + public boolean approximateDecimal() { + return delegate.approximateDecimal(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/db78bd6f/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java ---------------------------------------------------------------------- diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java index d88f226,0000000..315556a mode 100644,000000..100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixSchema.java @@@ -1,579 -1,0 +1,579 @@@ +package org.apache.phoenix.calcite; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.Multimap; +import com.google.common.collect.ArrayListMultimap; + +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.linq4j.tree.Expression; +import org.apache.calcite.materialize.MaterializationService; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.*; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.TableFunctionImpl; +import org.apache.calcite.schema.impl.ViewTable; +import org.apache.calcite.sql.ListJarsTable; +import org.apache.phoenix.compile.ColumnResolver; +import org.apache.phoenix.compile.FromCompiler; +import org.apache.phoenix.compile.SequenceManager; +import org.apache.phoenix.expression.function.FunctionExpression; +import org.apache.phoenix.expression.function.UDFExpression; +import org.apache.phoenix.expression.function.ByteBasedRegexpSplitFunction; +import org.apache.phoenix.expression.function.ByteBasedRegexpSubstrFunction; +import org.apache.phoenix.expression.function.ByteBasedRegexpReplaceFunction; +import org.apache.phoenix.expression.function.StringBasedRegexpSplitFunction; +import org.apache.phoenix.expression.function.StringBasedRegexpSubstrFunction; +import org.apache.phoenix.expression.function.StringBasedRegexpReplaceFunction; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.parse.ColumnDef; +import org.apache.phoenix.parse.NamedTableNode; +import org.apache.phoenix.parse.PFunction; +import org.apache.phoenix.parse.SequenceValueParseNode; +import org.apache.phoenix.parse.TableName; +import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; +import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunctionInfo; +import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunctionArgInfo; +import org.apache.phoenix.parse.FunctionParseNode.FunctionClassType; +import org.apache.phoenix.parse.ParseNodeFactory; +import org.apache.phoenix.parse.PFunction.FunctionArgument; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.schema.PTable.ViewType; +import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.Sequence; +import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PDataTypeFactory; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SchemaUtil; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Implementation of Calcite's {@link Schema} SPI for Phoenix. + * + * TODO + * 1) change this to non-caching mode?? + * 2) how to deal with define indexes and views since they require a CalciteSchema + * instance?? + * + */ +public class PhoenixSchema implements Schema { + public static final Factory FACTORY = new Factory(); + + public final PhoenixConnection pc; + + protected final String name; + protected final String schemaName; + protected final SchemaPlus parentSchema; + protected final MetaDataClient client; + protected final SequenceManager sequenceManager; + protected final Map<String, Schema> subSchemas; + protected final Map<String, Table> tables; + protected final Map<String, Function> views; + protected final Set<TableRef> viewTables; + protected final UDFExpression exp = new UDFExpression(); + private final static Function listJarsFunction = TableFunctionImpl + .create(ListJarsTable.LIST_JARS_TABLE_METHOD); + private final Multimap<String, Function> builtinFunctions = ArrayListMultimap.create(); + private RelDataTypeFactory typeFactory; + + protected PhoenixSchema(String name, String schemaName, + SchemaPlus parentSchema, PhoenixConnection pc) { + this.name = name; + this.schemaName = schemaName; + this.parentSchema = parentSchema; + this.pc = pc; + this.client = new MetaDataClient(pc); + this.subSchemas = Maps.newHashMap(); + this.tables = Maps.newHashMap(); + this.views = Maps.newHashMap(); + this.views.put("ListJars", listJarsFunction); + this.viewTables = Sets.newHashSet(); + try { + PhoenixStatement stmt = (PhoenixStatement) pc.createStatement(); + this.sequenceManager = new SequenceManager(stmt); + } catch (SQLException e){ + throw new RuntimeException(e); + } + registerBuiltinFunctions(); + } + + protected PhoenixSchema(String name, String schemaName, + SchemaPlus parentSchema, PhoenixConnection pc, RelDataTypeFactory typeFactory) { + this(name, schemaName, parentSchema, pc); + this.setTypeFactory(typeFactory); + } + + private static Schema create(SchemaPlus parentSchema, + String name, Map<String, Object> operand) { + String url = (String) operand.get("url"); + final Properties properties = new Properties(); + for (Map.Entry<String, Object> entry : operand.entrySet()) { + properties.setProperty(entry.getKey(), String.valueOf(entry.getValue())); + } + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + final Connection connection = + DriverManager.getConnection(url, properties); + final PhoenixConnection phoenixConnection = + connection.unwrap(PhoenixConnection.class); + return new PhoenixSchema(name, null, parentSchema, phoenixConnection); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + + /** + * Registering Phoenix Builtin Functions in Calcite + * + * Registration of functions is coordinated from registerBuiltinFunctions() which builds the builtinFunctions map. + * The helper functions use mechanisms to convert Phoenix function information for use by Calcite. + * + * The builtinFunctions map is ultimately used by Calcite during validation and planning + * PhoenixSchema.getFunctions() - Matches function signatures during validation + * CalciteUtils.EXPRESSION_MAP - Maps a RexNode onto a builtin function + */ + private void registerBuiltinFunctions(){ + if(!builtinFunctions.isEmpty()) { + return; + } + final boolean useByteBasedRegex = + pc.getQueryServices().getProps().getBoolean(QueryServices.USE_BYTE_BASED_REGEX_ATTRIB, + QueryServicesOptions.DEFAULT_USE_BYTE_BASED_REGEX); + final List<String> ignoredRegexFunctions = useByteBasedRegex ? + Lists.newArrayList( + StringBasedRegexpReplaceFunction.class.getName(), + StringBasedRegexpSplitFunction.class.getName(), + StringBasedRegexpSubstrFunction.class.getName()) : + Lists.newArrayList( + ByteBasedRegexpReplaceFunction.class.getName(), + ByteBasedRegexpSubstrFunction.class.getName(), + ByteBasedRegexpSplitFunction.class.getName()); + + Multimap<String, BuiltInFunctionInfo> infoMap = ParseNodeFactory.getBuiltInFunctionMultimap(); + List<BuiltInFunctionInfo> aliasFunctions = Lists.newArrayList(); + for (BuiltInFunctionInfo info : infoMap.values()) { + //TODO: Support aggregate functions + if(!CalciteUtils.TRANSLATED_BUILT_IN_FUNCTIONS.contains(info.getName()) && !info.isAggregate()) { + if (info.getClassType() == FunctionClassType.ALIAS) { + aliasFunctions.add(info); + continue; + } + if(ignoredRegexFunctions.contains(info.getFunc().getName())){ + continue; + } + builtinFunctions.putAll(info.getName(), convertBuiltinFunction(info)); + } + } + // Single depth alias functions only + for(BuiltInFunctionInfo info : aliasFunctions) { + // Point the alias function to its derived functions + for (Class<? extends FunctionExpression> func : info.getDerivedFunctions()) { + BuiltInFunction d = func.getAnnotation(BuiltInFunction.class); + Collection<Function> targetFunction = builtinFunctions.get(d.name()); + + // Target function not implemented + if(targetFunction.isEmpty()) { + for(BuiltInFunctionInfo derivedInfo : infoMap.get(d.name())) { + targetFunction.addAll(convertBuiltinFunction(derivedInfo)); + } + } + builtinFunctions.putAll(info.getName(), targetFunction); + } + } + } + + // Converts phoenix function information to a list of Calcite function signatures + private static List<PhoenixScalarFunction> convertBuiltinFunction(BuiltInFunctionInfo functionInfo){ + List<List<FunctionArgument>> overloadedArgs = PhoenixSchema.overloadArguments(functionInfo.getArgs()); + List<PhoenixScalarFunction> functionList = Lists.newArrayListWithExpectedSize(overloadedArgs.size()); + Class<? extends FunctionExpression> clazz = functionInfo.getFunc(); + + try { + for (List<FunctionArgument> argumentList : overloadedArgs) { + List<FunctionParameter> parameters = Lists.newArrayListWithExpectedSize(argumentList.size()); + PDataType returnType = evaluateReturnType(clazz, argumentList); + + for (final FunctionArgument arg : argumentList) { + parameters.add( + new FunctionParameter() { + public int getOrdinal() { + return arg.getArgPosition(); + } + + public String getName() { + return "arg" + arg.getArgPosition(); + } + + @SuppressWarnings("rawtypes") + public RelDataType getType(RelDataTypeFactory typeFactory) { + PDataType dataType = + arg.isArrayType() ? PDataType.fromTypeId(PDataType.sqlArrayType(SchemaUtil + .normalizeIdentifier(SchemaUtil.normalizeIdentifier(arg + .getArgumentType())))) : PDataType.fromSqlTypeName(SchemaUtil + .normalizeIdentifier(arg.getArgumentType())); + return typeFactory.createJavaType(dataType.getJavaClass()); + } + + public boolean isOptional() { + return arg.getDefaultValue() != null; + } + }); + } + functionList.add(new PhoenixScalarFunction(functionInfo, parameters, returnType)); + } + } catch (Exception e){ + throw new RuntimeException("Builtin function " + functionInfo.getName() + " could not be registered", e); + } + return functionList; + } + + // Dynamically evaluates the return type of a built in function given a list of input arguments + private static PDataType evaluateReturnType(Class<? extends FunctionExpression> f, List<FunctionArgument> argumentList) { + BuiltInFunction d = f.getAnnotation(BuiltInFunction.class); + try { + // Direct evaluation of the return type + FunctionExpression func = f.newInstance(); + return func.getDataType(); + } catch (Exception e) { + if (d.classType() == FunctionClassType.ALIAS || d.classType() == FunctionClassType.ABSTRACT) { + // should never happen + throw new RuntimeException(); + } + // Grab the primary argument + assert(argumentList.size() != 0); + return PDataType.fromSqlTypeName(argumentList.get(0).getArgumentType()); + } + } + + // Using Phoenix argument information, determine all possible function signatures + private static List<List<PFunction.FunctionArgument>> overloadArguments(BuiltInFunctionArgInfo[] args){ + List<List<PFunction.FunctionArgument>> overloadedArgs = Lists.newArrayList(); + int solutions = 1; + for(int i = 0; i < args.length; solutions *= args[i].getAllowedTypes().length, i++); + for(int i = 0; i < solutions; i++) { + int j = 1; + short k = 0; + overloadedArgs.add(new ArrayList<PFunction.FunctionArgument>()); + for(BuiltInFunctionArgInfo arg : args) { + Class<? extends PDataType>[] temp = arg.getAllowedTypes(); + PDataType dataType = PDataTypeFactory.getInstance().instanceFromClass(temp[(i/j)%temp.length]); + overloadedArgs.get(i).add( new PFunction.FunctionArgument( + dataType.toString(), + dataType.isArrayType(), + arg.isConstant(), + arg.getDefaultValue(), + arg.getMinValue(), + arg.getMaxValue(), + k)); + k++; + j *= temp.length; + } + } + return overloadedArgs; + } + + @Override + public Table getTable(String name) { + Table table = tables.get(name); + if (table != null) { + return table; + } + + try { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create( + null, + TableName.create(schemaName, name), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + TableRef tableRef = tables.get(0); + if (!isView(tableRef.getTable())) { + tableRef = fixTableMultiTenancy(tableRef); + table = new PhoenixTable(pc, tableRef, getTypeFactory()); + } + } catch (TableNotFoundException e) { + } catch (SQLException e) { + throw new RuntimeException(e); + } + + if (table == null) { + table = resolveSequence(name); + } + + if (table != null) { + tables.put(name, table); + } + return table; + } + + @Override + public Set<String> getTableNames() { + return tables.keySet(); + } + + @Override + public Collection<Function> getFunctions(String name) { + assert(!builtinFunctions.isEmpty()); + if(!builtinFunctions.get(name).isEmpty()){ + return builtinFunctions.get(name); + } + Function func = views.get(name); + if (func != null) { + return ImmutableList.of(func); + } + try { + List<String> functionNames = new ArrayList<String>(1); + functionNames.add(name); + ColumnResolver resolver = FromCompiler.getResolverForFunction(pc, functionNames); + List<PFunction> pFunctions = resolver.getFunctions(); + assert !pFunctions.isEmpty(); + List<Function> funcs = new ArrayList<Function>(pFunctions.size()); + for (PFunction pFunction : pFunctions) { + funcs.add(new PhoenixScalarFunction(pFunction)); + } + return ImmutableList.copyOf(funcs); + } catch (SQLException e) { + } + try { + ColumnResolver x = FromCompiler.getResolver( + NamedTableNode.create( + null, + TableName.create(schemaName, name), + ImmutableList.<ColumnDef>of()), pc); + final List<TableRef> tables = x.getTables(); + assert tables.size() == 1; + final TableRef tableRef = tables.get(0); + final PTable pTable = tableRef.getTable(); + if (isView(pTable)) { + String viewSql = pTable.getViewStatement(); + if (viewSql == null) { + viewSql = "select * from " + + SchemaUtil.getEscapedFullTableName( + pTable.getPhysicalName().getString()); + } + SchemaPlus schema = parentSchema.getSubSchema(this.name); + SchemaPlus viewSqlSchema = + this.schemaName == null ? schema : parentSchema; + func = ViewTable.viewMacro(schema, viewSql, + CalciteSchema.from(viewSqlSchema).path(null), + null, pTable.getViewType() == ViewType.UPDATABLE); + views.put(name, func); + viewTables.add(tableRef); + } + } catch (TableNotFoundException e) { + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return func == null ? Collections.<Function>emptyList() : ImmutableList.of(func); + } + + @Override + public Set<String> getFunctionNames() { + return views.keySet(); + } + + @Override + public Schema getSubSchema(String name) { + if (schemaName != null) { + return null; + } + + Schema schema = subSchemas.get(name); + if (schema != null) { + return schema; + } + + //TODO We should call FromCompiler.getResolverForSchema() here after + // all schemas are required to be explicitly created. + if (getTable(name) != null || !getFunctions(name).isEmpty()) { + return null; + } + schema = new PhoenixSchema(name, name, parentSchema.getSubSchema(this.name), pc, typeFactory); + subSchemas.put(name, schema); + return schema; + } + + @Override + public Set<String> getSubSchemaNames() { + return subSchemas.keySet(); + } + + @Override + public Expression getExpression(SchemaPlus parentSchema, String name) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isMutable() { + return true; + } + + @Override + public boolean contentsHaveChangedSince(long lastCheck, long now) { + return lastCheck != now; + } + + public void clear() { + tables.clear(); + views.clear(); + this.views.put("ListJars", listJarsFunction); + viewTables.clear(); + } + + public void defineIndexesAsMaterializations() { + SchemaPlus schema = parentSchema.getSubSchema(this.name); + SchemaPlus viewSqlSchema = + this.schemaName == null ? schema : parentSchema; + CalciteSchema calciteSchema = CalciteSchema.from(schema); + List<String> path = CalciteSchema.from(viewSqlSchema).path(null); + try { + List<PhoenixTable> phoenixTables = Lists.newArrayList(); + for (Table table : tables.values()) { + if (table instanceof PhoenixTable) { + phoenixTables.add((PhoenixTable) table); + } + } + for (PhoenixTable phoenixTable : phoenixTables) { + TableRef tableRef = phoenixTable.tableMapping.getTableRef(); + for (PTable index : tableRef.getTable().getIndexes()) { + TableRef indexTableRef = new TableRef(null, index, + tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), + false); + addMaterialization(indexTableRef, path, calciteSchema); + } + } + for (TableRef tableRef : viewTables) { + final PTable pTable = tableRef.getTable(); + for (PTable index : pTable.getIndexes()) { + if (index.getParentName().equals(pTable.getName())) { + TableRef indexTableRef = new TableRef(null, index, + tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), + false); + addMaterialization(indexTableRef, path, calciteSchema); + } + } + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + private void addMaterialization(TableRef indexTableRef, List<String> path, + CalciteSchema calciteSchema) throws SQLException { + indexTableRef = fixTableMultiTenancy(indexTableRef); + final PhoenixTable table = new PhoenixTable(pc, indexTableRef, getTypeFactory()); + final PTable index = indexTableRef.getTable(); + tables.put(index.getTableName().getString(), table); + StringBuffer sb = new StringBuffer(); + sb.append("SELECT"); + for (PColumn column : table.getColumns()) { + String indexColumnName = column.getName().getString(); + String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); + sb.append(",").append(SchemaUtil.getEscapedFullColumnName(dataColumnName)); + sb.append(" ").append(SchemaUtil.getEscapedFullColumnName(indexColumnName)); + } + sb.setCharAt(6, ' '); // replace first comma with space. + sb.append(" FROM ").append(SchemaUtil.getEscapedFullTableName(index.getParentName().getString())); + MaterializationService.instance().defineMaterialization( + calciteSchema, null, sb.toString(), path, index.getTableName().getString(), true, true); + } + + private boolean isView(PTable table) { + return table.getType() == PTableType.VIEW + && table.getViewType() != ViewType.MAPPED; + } + + private TableRef fixTableMultiTenancy(TableRef tableRef) throws SQLException { + if (pc.getTenantId() != null || !tableRef.getTable().isMultiTenant()) { + return tableRef; + } + PTable table = tableRef.getTable(); + table = PTableImpl.makePTable( + table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(), + table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), PTableImpl.getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), + table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), + table.isWALDisabled(), false, table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), + table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getBaseColumnCount(), table.getIndexDisableTimestamp(), - table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema()); ++ table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter()); + return new TableRef(null, table, tableRef.getTimeStamp(), + tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols()); + } + + private PhoenixSequence resolveSequence(String name) { + try { + sequenceManager.newSequenceReference(pc.getTenantId(), + TableName.createNormalized(schemaName, name) , + null, SequenceValueParseNode.Op.NEXT_VALUE); + sequenceManager.validateSequences(Sequence.ValueOp.VALIDATE_SEQUENCE); + } catch (SQLException e){ + return null; + } finally { + sequenceManager.reset(); + } + + return new PhoenixSequence(schemaName, name, pc); + } + + public RelDataTypeFactory getTypeFactory() { + return typeFactory; + } + + public void setTypeFactory(RelDataTypeFactory typeFactory) { + this.typeFactory = typeFactory; + } + + /** Schema factory that creates a + * {@link org.apache.phoenix.calcite.PhoenixSchema}. + * This allows you to create a Phoenix schema inside a model.json file. + * + * <pre>{@code + * { + * version: '1.0', + * defaultSchema: 'HR', + * schemas: [ + * { + * name: 'HR', + * type: 'custom', + * factory: 'org.apache.phoenix.calcite.PhoenixSchema.Factory', + * operand: { + * url: "jdbc:phoenix:localhost", + * user: "scott", + * password: "tiger" + * } + * } + * ] + * } + * }</pre> + */ + public static class Factory implements SchemaFactory { + public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { + return PhoenixSchema.create(parentSchema, name, operand); + } + } +}