PHOENIX-1620 Add API for getting tenant ID from an HBase row of a Phoenix table
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f3c675bf Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f3c675bf Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f3c675bf Branch: refs/heads/calcite Commit: f3c675bf735d7d4e4534433f3406af15360ed5d9 Parents: 2d77033 Author: James Taylor <jtay...@salesforce.com> Authored: Fri Feb 6 11:14:26 2015 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Fri Feb 6 14:59:01 2015 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/PhoenixRuntimeIT.java | 138 +++++++++++++++++++ .../org/apache/phoenix/util/PhoenixRuntime.java | 39 +++++- .../org/apache/phoenix/util/SchemaUtil.java | 4 + .../java/org/apache/phoenix/query/BaseTest.java | 20 ++- .../apache/phoenix/util/PhoenixRuntimeTest.java | 58 ++++++++ 5 files changed, 255 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f3c675bf/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRuntimeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRuntimeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRuntimeIT.java new file mode 100644 index 0000000..234ea1c --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRuntimeIT.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class PhoenixRuntimeIT extends BaseHBaseManagedTimeIT { + private static void assertTenantIds(Expression e, HTableInterface htable, Filter filter, String[] tenantIds) throws IOException { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + Scan scan = new Scan(); + scan.setFilter(filter); + ResultScanner scanner = htable.getScanner(scan); + Result result = null; + ResultTuple tuple = new ResultTuple(); + List<String> actualTenantIds = Lists.newArrayListWithExpectedSize(tenantIds.length); + List<String> expectedTenantIds = Arrays.asList(tenantIds); + while ((result = scanner.next()) != null) { + tuple.setResult(result); + e.evaluate(tuple, ptr); + String tenantId = (String)PVarchar.INSTANCE.toObject(ptr); + actualTenantIds.add(tenantId == null ? "" : tenantId); + } + // Need to sort because of salting + Collections.sort(actualTenantIds); + assertEquals(expectedTenantIds, actualTenantIds); + } + + @Test + public void testGetTenantIdExpressionForSaltedTable() throws Exception { + testGetTenantIdExpression(true); + } + + @Test + public void testGetTenantIdExpressionForUnsaltedTable() throws Exception { + testGetTenantIdExpression(false); + } + + private static Filter getUserTableAndViewsFilter() { + SingleColumnValueFilter tableFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.TABLE_TYPE_BYTES, CompareOp.EQUAL, Bytes.toBytes(PTableType.TABLE.getSerializedValue())); + tableFilter.setFilterIfMissing(true); + SingleColumnValueFilter viewFilter = new SingleColumnValueFilter(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.TABLE_TYPE_BYTES, CompareOp.EQUAL, Bytes.toBytes(PTableType.VIEW.getSerializedValue())); + viewFilter.setFilterIfMissing(true); + FilterList filter = new FilterList(FilterList.Operator.MUST_PASS_ONE, Arrays.asList(new Filter[] {tableFilter, viewFilter})); + return filter; + } + + private void testGetTenantIdExpression(boolean isSalted) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.setAutoCommit(true); + String tableName = "FOO_" + (isSalted ? "SALTED" : "UNSALTED"); + conn.createStatement().execute("CREATE TABLE " + tableName + " (k1 VARCHAR NOT NULL, k2 VARCHAR, CONSTRAINT PK PRIMARY KEY(K1,K2)) MULTI_TENANT=true" + (isSalted ? ",SALT_BUCKETS=3" : "")); + conn.createStatement().execute("CREATE SEQUENCE s1"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('t1','x')"); + conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('t2','y')"); + + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, "t1"); + Connection tsconn = DriverManager.getConnection(getUrl(), props); + tsconn.createStatement().execute("CREATE SEQUENCE s1"); + Expression e1 = PhoenixRuntime.getTenantIdExpression(tsconn, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME); + HTableInterface htable1 = tsconn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES); + assertTenantIds(e1, htable1, new FirstKeyOnlyFilter(), new String[] {"", "t1"} ); + + tsconn.createStatement().execute("CREATE VIEW A.BAR(V1 VARCHAR) AS SELECT * FROM " + tableName); + Expression e2 = PhoenixRuntime.getTenantIdExpression(tsconn, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + HTableInterface htable2 = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + assertTenantIds(e2, htable2, getUserTableAndViewsFilter(), new String[] {"", "t1"} ); + + Expression e3 = PhoenixRuntime.getTenantIdExpression(conn, tableName); + HTableInterface htable3 = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)); + assertTenantIds(e3, htable3, new FirstKeyOnlyFilter(), new String[] {"t1", "t2"} ); + + conn.createStatement().execute("CREATE TABLE BAS (k1 VARCHAR PRIMARY KEY)"); + Expression e4 = PhoenixRuntime.getTenantIdExpression(conn, "BAS"); + assertNull(e4); + + tsconn.createStatement().execute("CREATE INDEX I1 ON A.BAR(V1)"); + Expression e5 = PhoenixRuntime.getTenantIdExpression(tsconn, "A.I1"); + HTableInterface htable5 = tsconn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX + tableName)); + assertTenantIds(e5, htable5, new FirstKeyOnlyFilter(), new String[] {"t1"} ); + + conn.createStatement().execute("CREATE INDEX I2 ON " + tableName + "(k2)"); + Expression e6 = PhoenixRuntime.getTenantIdExpression(conn, "I2"); + HTableInterface htable6 = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("I2")); + assertTenantIds(e6, htable6, new FirstKeyOnlyFilter(), new String[] {"t1", "t2"} ); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f3c675bf/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index b2d7851..02a2776 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -28,6 +28,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -56,6 +57,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.OrderByExpression; +import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.query.QueryConstants; @@ -66,13 +68,15 @@ import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueBitSet; +import org.apache.phoenix.schema.types.PDataType; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; @@ -927,5 +931,38 @@ public class PhoenixRuntime { } return pColumn; } + + /** + * Get expression that may be used to evaluate the tenant ID of a given row in a + * multi-tenant table. Both the SYSTEM.CATALOG table and the SYSTEM.SEQUENCE + * table are considered multi-tenant. + * @param conn open Phoenix connection + * @param fullTableName full table name + * @return An expression that may be evaluated for a row in the provided table or + * null if the table is not a multi-tenant table. + * @throws SQLException if the table name is not found, a TableNotFoundException + * is thrown. If a multi-tenant local index is supplied a SQLFeatureNotSupportedException + * is thrown. + */ + public static Expression getTenantIdExpression(Connection conn, String fullTableName) throws SQLException { + PTable table = getTable(conn, fullTableName); + // TODO: consider setting MULTI_TENANT = true for SYSTEM.CATALOG and SYSTEM.SEQUENCE + if (!SchemaUtil.isMetaTable(table) && !SchemaUtil.isSequenceTable(table) && !table.isMultiTenant()) { + return null; + } + if (table.getIndexType() == IndexType.LOCAL) { + /* + * With some hackery, we could deduce the tenant ID from a multi-tenant local index, + * however it's not clear that we'd want to maintain the same prefixing of the region + * start key, as the region boundaries may end up being different on a cluster being + * replicated/backed-up to (which is the use case driving the method). + */ + throw new SQLFeatureNotSupportedException(); + } + + int pkPosition = table.getBucketNum() == null ? 0 : 1; + List<PColumn> pkColumns = table.getPKColumns(); + return new RowKeyColumnExpression(pkColumns.get(pkPosition), new RowKeyValueAccessor(pkColumns, pkPosition)); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f3c675bf/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index 9ab0692..afd61ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -376,6 +376,10 @@ public class SchemaUtil { return Bytes.compareTo(tableName, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES) == 0; } + public static boolean isSequenceTable(PTable table) { + return PhoenixDatabaseMetaData.SEQUENCE_FULLNAME.equals(table.getName().getString()); + } + public static boolean isMetaTable(PTable table) { return PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA.equals(table.getSchemaName().getString()) && PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE.equals(table.getTableName().getString()); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f3c675bf/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index f81c3a9..9947440 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -821,14 +821,28 @@ public abstract class BaseTest { } } - private static void deletePriorSequences(long ts, Connection conn) throws Exception { + private static void deletePriorSequences(long ts, Connection globalConn) throws Exception { // TODO: drop tenant-specific sequences too - ResultSet rs = conn.createStatement().executeQuery("SELECT " + ResultSet rs = globalConn.createStatement().executeQuery("SELECT " + + PhoenixDatabaseMetaData.TENANT_ID + "," + PhoenixDatabaseMetaData.SEQUENCE_SCHEMA + "," + PhoenixDatabaseMetaData.SEQUENCE_NAME + " FROM " + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_ESCAPED); + String lastTenantId = null; + Connection conn = globalConn; while (rs.next()) { - conn.createStatement().execute("DROP SEQUENCE " + SchemaUtil.getEscapedTableName(rs.getString(1), rs.getString(2))); + String tenantId = rs.getString(1); + if (tenantId != null && !tenantId.equals(lastTenantId)) { + if (lastTenantId != null) { + conn.close(); + } + // Open tenant-specific connection when we find a new one + Properties props = new Properties(globalConn.getClientInfo()); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + conn = DriverManager.getConnection(url, props); + lastTenantId = tenantId; + } + conn.createStatement().execute("DROP SEQUENCE " + SchemaUtil.getEscapedTableName(rs.getString(2), rs.getString(3))); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f3c675bf/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java index a642e80..c1f3244 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/PhoenixRuntimeTest.java @@ -20,6 +20,7 @@ package org.apache.phoenix.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -27,6 +28,7 @@ import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -34,7 +36,10 @@ import java.util.Properties; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.types.PDataType; import org.junit.Test; @@ -155,4 +160,57 @@ public class PhoenixRuntimeTest extends BaseConnectionlessQueryTest { fail("Failed sql: " + sb.toString() + ExceptionUtils.getStackTrace(e)); } } + + @Test + public void testGetTenantIdExpression() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + Expression e1 = PhoenixRuntime.getTenantIdExpression(conn, PhoenixDatabaseMetaData.SYSTEM_STATS_NAME); + assertNull(e1); + Expression e2 = PhoenixRuntime.getTenantIdExpression(conn, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + assertNotNull(e2); + + Expression e3 = PhoenixRuntime.getTenantIdExpression(conn, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME); + assertNotNull(e3); + + conn.createStatement().execute("CREATE TABLE FOO (k VARCHAR PRIMARY KEY)"); + Expression e4 = PhoenixRuntime.getTenantIdExpression(conn, "FOO"); + assertNull(e4); + + conn.createStatement().execute("CREATE TABLE A.BAR (k1 VARCHAR NOT NULL, k2 VARCHAR, CONSTRAINT PK PRIMARY KEY(K1,K2)) MULTI_TENANT=true"); + Expression e5 = PhoenixRuntime.getTenantIdExpression(conn, "A.BAR"); + assertNotNull(e5); + + conn.createStatement().execute("CREATE INDEX I1 ON A.BAR (K2)"); + Expression e5A = PhoenixRuntime.getTenantIdExpression(conn, "A.I1"); + assertNotNull(e5A); + + conn.createStatement().execute("CREATE TABLE BAS (k1 VARCHAR NOT NULL, k2 VARCHAR, CONSTRAINT PK PRIMARY KEY(K1,K2)) MULTI_TENANT=true, SALT_BUCKETS=3"); + Expression e6 = PhoenixRuntime.getTenantIdExpression(conn, "BAS"); + assertNotNull(e6); + + conn.createStatement().execute("CREATE INDEX I2 ON BAS (K2)"); + Expression e6A = PhoenixRuntime.getTenantIdExpression(conn, "I2"); + assertNotNull(e6A); + + try { + PhoenixRuntime.getTenantIdExpression(conn, "NOT.ATABLE"); + fail(); + } catch (TableNotFoundException e) { + // Expected + } + + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, "t1"); + Connection tsconn = DriverManager.getConnection(getUrl(), props); + tsconn.createStatement().execute("CREATE VIEW V(V1 VARCHAR) AS SELECT * FROM BAS"); + Expression e7 = PhoenixRuntime.getTenantIdExpression(tsconn, "V"); + assertNotNull(e7); + tsconn.createStatement().execute("CREATE LOCAL INDEX I3 ON V (V1)"); + try { + PhoenixRuntime.getTenantIdExpression(tsconn, "I3"); + fail(); + } catch (SQLFeatureNotSupportedException e) { + // Expected + } + } }