This is an automated email from the ASF dual-hosted git repository. yanxinyi pushed a commit to branch 4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push: new 9a370bc PHOENIX-5601 Add a new Coprocessor - ViewTTLAware Coprocessor 9a370bc is described below commit 9a370bccd72e1c0cc91ce1004350b9164b249056 Author: Jacob Isaac <jis...@salesforce.com> AuthorDate: Fri Nov 15 08:58:15 2019 -0800 PHOENIX-5601 Add a new Coprocessor - ViewTTLAware Coprocessor Signed-off-by: Xinyi Yan <yanxi...@apache.org> --- .../java/org/apache/phoenix/end2end/ViewTTLIT.java | 1054 +++++++++++++++++++- .../phoenix/compile/ServerBuildIndexCompiler.java | 3 +- .../coprocessor/BaseScannerRegionObserver.java | 4 +- .../coprocessor/TTLAwareRegionObserver.java | 336 +++++++ .../phoenix/iterate/TableResultIterator.java | 3 +- .../phoenix/query/ConnectionQueryServicesImpl.java | 10 + .../java/org/apache/phoenix/util/IndexUtil.java | 100 +- .../java/org/apache/phoenix/util/ScanUtil.java | 147 +++ .../apache/phoenix/query/PhoenixTestBuilder.java | 338 ++++++- 9 files changed, 1819 insertions(+), 176 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java index ee77b33..18a2fa8 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java @@ -18,6 +18,8 @@ package org.apache.phoenix.end2end; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Result; @@ -31,19 +33,32 @@ import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; -import org.apache.phoenix.query.PhoenixTestBuilder; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder; +import org.apache.phoenix.query.PhoenixTestBuilder.BasicDataReader; +import org.apache.phoenix.query.PhoenixTestBuilder.BasicDataWriter; +import org.apache.phoenix.query.PhoenixTestBuilder.DataReader; +import org.apache.phoenix.query.PhoenixTestBuilder.DataWriter; +import org.apache.phoenix.query.PhoenixTestBuilder.DataSupplier; +import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.OtherOptions; import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions; +import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions; +import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions; import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewOptions; import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; @@ -51,20 +66,26 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; - +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.Random; +import java.util.Set; + +import static java.util.Arrays.asList; +import static org.apache.phoenix.query.PhoenixTestBuilder.DDLDefaults.MAX_ROWS; import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class ViewTTLIT extends ParallelStatsDisabledIT { - private static final Logger LOGGER = LoggerFactory.getLogger(ViewTTLIT.class); - private static final String ORG_ID_FMT = "00D0x000%s"; - private static final String ID_FMT = "00A0y000%07d"; private static final String VIEW_TTL_HEADER_SQL = "SELECT VIEW_TTL FROM SYSTEM.CATALOG " + "WHERE %s AND TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND TABLE_TYPE = '%s'"; private static final String ALTER_VIEW_TTL_SQL = "ALTER VIEW %s.%s set VIEW_TTL=%d"; + private static final int DEFAULT_NUM_ROWS = 5; // Scans the HBase rows directly for the view ttl related header rows column and asserts private void assertViewHeaderRowsHaveViewTTLRelatedCells(String schemaName, long minTimestamp, @@ -96,7 +117,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { assertEquals(String.format("Expected rows do not match for table = %s at timestamp %d", Bytes.toString(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES), minTimestamp), expectedRows, numMatchingRows); } - } private void assertSyscatHaveViewTTLRelatedColumns(String tenantId, String schemaName, String tableName, String tableType, long ttlValueExpected) @@ -119,7 +139,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { return name.replace("\"", ""); } - /** * ----------------- * Test methods @@ -134,7 +153,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { // Define the test schema. // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID) - final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); schemaBuilder .withTableDefaults() .withGlobalViewDefaults() @@ -145,15 +164,13 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, true, 2); } - - @Test public void testViewTTLWithTableLevelTTLFails() throws Exception { // Define the test schema. // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) - final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); TableOptions tableOptions = TableOptions.withDefaults(); tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true,TTL=100"); @@ -177,7 +194,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { // Define the test schema. // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) - final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); TableOptions tableOptions = TableOptions.withDefaults(); tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); @@ -203,7 +220,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { // Define the test schema. // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) - final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); TableOptions tableOptions = TableOptions.withDefaults(); tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); @@ -238,18 +255,18 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID) // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) - final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); TableOptions tableOptions = TableOptions.withDefaults(); tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); - PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions - globalViewOptions = PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults(); + SchemaBuilder.GlobalViewOptions + globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); globalViewOptions.setTableProps("VIEW_TTL=300000"); - PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions + SchemaBuilder.GlobalViewIndexOptions globalViewIndexOptions = - PhoenixTestBuilder.SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); globalViewIndexOptions.setLocal(false); TenantViewOptions tenantViewWithOverrideOptions = TenantViewOptions.withDefaults(); @@ -274,7 +291,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 4); assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, PTableType.VIEW.getSerializedValue(), 300000); assertSyscatHaveViewTTLRelatedColumns("", schemaName, indexOnGlobalViewName, PTableType.INDEX.getSerializedValue(), 300000); - // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view. + // Since the VIEW_TTL property values are being overriden, we expect the TTL value to be different from the global view. assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 1000); assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000); @@ -315,7 +332,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { // Define the test schema. // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) - final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); TableOptions tableOptions = TableOptions.withDefaults(); tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); @@ -341,7 +358,6 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view. assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 0); assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 0); - } @Test @@ -351,7 +367,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { // Define the test schema. // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) // 2. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) - final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); TableOptions tableOptions = TableOptions.withDefaults(); tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); @@ -391,7 +407,997 @@ public class ViewTTLIT extends ParallelStatsDisabledIT { // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view. assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 1000); assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, indexOnTenantViewName, PTableType.INDEX.getSerializedValue(), 1000); + } + + @Test + public void testViewTTLForLevelTwoViewWithNoIndexes() throws Exception { + long startTime = System.currentTimeMillis(); + + // Define the test schema. + // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) + // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID) + // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); + + SchemaBuilder.GlobalViewOptions + globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps("VIEW_TTL=300000"); + + TenantViewOptions tenantViewWithOverrideOptions = TenantViewOptions.withDefaults(); + tenantViewWithOverrideOptions.setTableProps("VIEW_TTL=10000"); + schemaBuilder + .withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withTenantViewOptions(tenantViewWithOverrideOptions) + .buildWithNewTenant(); + + String tenantId = schemaBuilder.getDataOptions().getTenantId(); + String schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName())); + String globalViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName())); + String tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName())); + + // Expected 2 rows - one for GlobalView, one for TenantView each. + // Since the VIEW_TTL property values are being set, we expect the view header columns to show up in regular scans too. + assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 2); + assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, PTableType.VIEW.getSerializedValue(), 300000); + // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be different from the global view. + assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 10000); + + // Without override + startTime = System.currentTimeMillis(); + + TenantViewOptions tenantViewWithoutOverrideOptions = TenantViewOptions.withDefaults(); + schemaBuilder + .withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withTenantViewOptions(tenantViewWithoutOverrideOptions) + .buildWithNewTenant(); + + tenantId = schemaBuilder.getDataOptions().getTenantId(); + schemaName = stripQuotes(SchemaUtil.getSchemaNameFromFullName(schemaBuilder.getEntityTenantViewName())); + globalViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityGlobalViewName())); + tenantViewName = stripQuotes(SchemaUtil.getTableNameFromFullName(schemaBuilder.getEntityTenantViewName())); + + // Expected 1 rows - one for TenantView each. + // Since the VIEW_TTL property values are being set, we expect the view header columns to show up in regular scans too. + assertViewHeaderRowsHaveViewTTLRelatedCells(schemaBuilder.getTableOptions().getSchemaName(), startTime, false, 1); + assertSyscatHaveViewTTLRelatedColumns("", schemaName, globalViewName, PTableType.VIEW.getSerializedValue(), 300000); + // Since the VIEW_TTL property values are not being overriden, we expect the TTL value to be same as the global view. + assertSyscatHaveViewTTLRelatedColumns(tenantId, schemaName, tenantViewName, PTableType.VIEW.getSerializedValue(), 300000); + } + + @Test + public void testWithTenantViewAndNoGlobalView() throws Exception { + + long viewTTL = 10000; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults(); + tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder + .withTableOptions(tableOptions) + .withTenantViewOptions(tenantViewOptions) + .build(); + + // Define the test data. + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00A0y000%07d", rowIndex); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + return Lists.newArrayList(new Object[] { zid, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ZID"); + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + try (Connection writeConnection = DriverManager + .getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + dataReader.setValidationColumns(columns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT %s from %s", + Joiner.on(",").join(columns), + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, dataWriter, + dataReader, schemaBuilder); + } + } + + @Test public void testWithSQLUsingIndex() throws Exception { + + long viewTTL = 10000; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + GlobalViewOptions + globalViewOptions = + SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL)); + + GlobalViewIndexOptions + globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions + .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL)); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null, null, null, null)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder + .withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault) + .build(); + + + // Define the test data. + final List<String> outerCol4s = Lists.newArrayList(); + DataSupplier dataSupplier = new DataSupplier() { + String col4ForWhereClause; + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format("00A0y000%07d", rowIndex); + String zid = String.format("00B0y000%07d", rowIndex); + String + col4 = + String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + // Store the col4 data to be used later in a where clause + outerCol4s.add(col4); + String + col5 = + String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col6 = + String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col7 = + String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col8 = + String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col9 = + String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { id, zid, col4, col5, col6, col7, + col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = Lists.newArrayList("ID", "ZID", + "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("COL6"); + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + try (Connection writeConnection = DriverManager + .getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Upsert data for validation + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + dataReader.setValidationColumns(rowKeyColumns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT col6 from %s where col4 = '%s'", + schemaBuilder.getEntityTenantViewName(), outerCol4s.get(1))); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, schemaBuilder); + } } + @Test public void testWithVariousSQLs() throws Exception { + + long viewTTL = 10000; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + GlobalViewOptions + globalViewOptions = + SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL)); + + GlobalViewIndexOptions + globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions + .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL)); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null, null, null, null)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder + .withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault) + .build(); + + + // Define the test data. + final String groupById = String.format("00A0y000%07d", 0); + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00B0y000%07d", rowIndex); + String + col4 = + String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col5 = + String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col6 = + String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col7 = + String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col8 = + String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col9 = + String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { groupById, zid, col4, col5, col6, col7, + col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + List<String> columns = Lists.newArrayList("ID", "ZID", + "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID"); + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + try (Connection writeConnection = DriverManager + .getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + // Case : count(1) sql + DataReader dataReader = new BasicDataReader(); + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String.format("SELECT count(1) as num_rows from %s HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, schemaBuilder); + + // Case : group by sql + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String.format("SELECT count(1) as num_rows from %s GROUP BY ID HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName(), + groupById)); + + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + validateExpiredRowsAreNotReturnedUsingCounts(viewTTL, dataReader, schemaBuilder); + } + } + + @Test + public void testWithTenantViewAndGlobalViewAndVariousOptions() throws Exception { + long viewTTL = 10000; + + // Define the test schema + TableOptions tableOptions = TableOptions.withDefaults(); + + GlobalViewOptions + globalViewOptions = + SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL)); + + GlobalViewIndexOptions + globalViewIndexOptions = + GlobalViewIndexOptions.withDefaults(); + + TenantViewOptions tenantViewOptions = new TenantViewOptions(); + tenantViewOptions + .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL)); + + TenantViewIndexOptions + tenantViewIndexOptions = + TenantViewIndexOptions.withDefaults(); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null, null, null, null)); + + for (String additionalProps : Lists + .newArrayList("COLUMN_ENCODED_BYTES=0", "DEFAULT_COLUMN_FAMILY='0'")) { + + StringBuilder withTableProps = new StringBuilder(); + withTableProps.append("MULTI_TENANT=true,").append(additionalProps); + + for (boolean isGlobalViewLocal : Lists.newArrayList(true, false)) { + for (boolean isTenantViewLocal : Lists.newArrayList(true, false)) { + + tableOptions.setTableProps(withTableProps.toString()); + globalViewIndexOptions.setLocal(isGlobalViewLocal); + tenantViewIndexOptions.setLocal(isTenantViewLocal); + OtherOptions otherOptions = testCaseWhenAllCFMatchAndAllDefault; + + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder + .withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewOptions) + .withTenantViewIndexOptions(tenantViewIndexOptions) + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault) + .buildWithNewTenant(); + + // Define the test data. + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String id = String.format("00A0y000%07d", rowIndex); + String zid = String.format("00B0y000%07d", rowIndex); + String + col1 = + String.format("a%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col2 = + String.format("b%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col3 = + String.format("c%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col4 = + String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col5 = + String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col6 = + String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col7 = + String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col8 = + String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col9 = + String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { id, zid, col1, col2, col3, col4, col5, col6, col7, + col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = Lists.newArrayList("ID", "ZID", "COL1", "COL2", + "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID"); + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder + .getDataOptions().getTenantId(); + try (Connection writeConnection = DriverManager + .getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + dataReader.setValidationColumns(columns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT %s from %s", + Joiner.on(",").join(columns), + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, + dataWriter, dataReader, schemaBuilder); + } + + + long scnTimestamp = System.currentTimeMillis()+viewTTL; + // Delete data by simulating expiration. + deleteData(schemaBuilder, scnTimestamp); + + // Verify after deleting TTL expired data. + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp)); + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> fetchedData = + fetchData(dataReader); + assertTrue("Expired rows should not be fetched", + fetchedData.rowKeySet().size() == 0); + } + } + } + } + } + + @Test + public void testGlobalAndTenantViewTTLInheritance() throws Exception { + long globalViewTTL = 300000; + long tenantViewTTL = 30000; + + // Define the test schema. + // 1. Table with default columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) + // 2. GlobalView with default columns => (ID, COL4, COL5, COL6), PK => (ID) + // 3. Tenant with default columns => (ZID, COL7, COL8, COL9), PK => (ZID) + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.setTableProps("COLUMN_ENCODED_BYTES=0,MULTI_TENANT=true"); + + SchemaBuilder.GlobalViewOptions + globalViewOptions = SchemaBuilder.GlobalViewOptions.withDefaults(); + globalViewOptions.setTableProps(String.format("VIEW_TTL=%d", globalViewTTL)); + + SchemaBuilder.GlobalViewIndexOptions + globalViewIndexOptions = + SchemaBuilder.GlobalViewIndexOptions.withDefaults(); + globalViewIndexOptions.setLocal(false); + + TenantViewOptions tenantViewWithOverrideOptions = new TenantViewOptions(); + tenantViewWithOverrideOptions + .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewWithOverrideOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + tenantViewWithOverrideOptions.setTableProps(String.format("VIEW_TTL=%d", tenantViewTTL)); + + OtherOptions testCaseWhenAllCFMatchAndAllDefault = new OtherOptions(); + testCaseWhenAllCFMatchAndAllDefault.setTestName("testCaseWhenAllCFMatchAndAllDefault"); + testCaseWhenAllCFMatchAndAllDefault + .setTableCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault + .setGlobalViewCFs(Lists.newArrayList((String) null, null, null)); + testCaseWhenAllCFMatchAndAllDefault.setTenantViewCFs(Lists.newArrayList((String)null, null, null, null)); + + /** + * ************************************************************ + * Case 1: Build schema with TTL overridden by the tenant view. + * TTL for GLOBAL_VIEW - 300000 + * TTL for TENANT_VIEW - 30000 + * ************************************************************ + */ + schemaBuilder + .withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewWithOverrideOptions) + .withTenantViewIndexDefaults() + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault) + .buildWithNewTenant(); + + // Define the test data. + final String id = String.format("00A0y000%07d", 0); + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00B0y000%07d", rowIndex); + String + col1 = + String.format("a%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col2 = + String.format("b%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col3 = + String.format("c%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col4 = + String.format("d%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col5 = + String.format("e%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col6 = + String.format("f%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col7 = + String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col8 = + String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String + col9 = + String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + + return Lists.newArrayList( + new Object[] { id, zid, col1, col2, col3, col4, col5, col6, col7, + col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + List<String> columns = Lists.newArrayList("ID", "ZID", + "COL1", "COL2", "COL3", "COL4", "COL5", "COL6", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ID", "ZID"); + String + tenant1ConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + try (Connection writeConnection = DriverManager + .getConnection(tenant1ConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + // Case : count(1) sql + DataReader dataReader = new BasicDataReader(); + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String.format("SELECT count(1) as num_rows from %s HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + // Use the ttl overridden by the tenant. + validateExpiredRowsAreNotReturnedUsingCounts(tenantViewTTL, dataReader, schemaBuilder); + } + + /** + * ************************************************************ + * Case 2: Build schema with TTL NOT overridden by the tenant view. + * TTL for GLOBAL_VIEW - 300000 + * TTL for TENANT_VIEW - 300000 + * ************************************************************ + */ + TenantViewOptions tenantViewWithoutOverrideOptions = new TenantViewOptions(); + tenantViewWithoutOverrideOptions + .setTenantViewColumns(asList("ZID", "COL7", "COL8", "COL9")); + tenantViewWithoutOverrideOptions + .setTenantViewColumnTypes(asList("CHAR(15)", "VARCHAR", "VARCHAR", "VARCHAR")); + + schemaBuilder + .withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withGlobalViewIndexOptions(globalViewIndexOptions) + .withTenantViewOptions(tenantViewWithoutOverrideOptions) + .withTenantViewIndexDefaults() + .withOtherOptions(testCaseWhenAllCFMatchAndAllDefault) + .buildWithNewTenant(); + + String + tenant2ConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + try (Connection writeConnection = DriverManager + .getConnection(tenant2ConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + upsertData(dataWriter, DEFAULT_NUM_ROWS); + + // Case : count(1) sql + DataReader dataReader = new BasicDataReader(); + dataReader.setValidationColumns(Arrays.asList("num_rows")); + dataReader.setRowKeyColumns(Arrays.asList("num_rows")); + dataReader.setDML(String.format("SELECT count(1) as num_rows from %s HAVING count(1) > 0", + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data exists before ttl expiration. + long probeTimestamp = System.currentTimeMillis() + globalViewTTL/2; + validateRowsAreNotMaskedUsingCounts(probeTimestamp, dataReader, schemaBuilder); + // Validate data before and after ttl expiration. + // Use the global view ttl since that is what the view has inherited. + validateExpiredRowsAreNotReturnedUsingCounts(globalViewTTL, dataReader, schemaBuilder); + } + } + + @Test public void testDeleteIfExpiredOnTenantView() throws Exception { + + long viewTTL = 180000; + TableOptions tableOptions = TableOptions.withDefaults(); + tableOptions.getTableColumns().clear(); + tableOptions.getTableColumnTypes().clear(); + + TenantViewOptions tenantViewOptions = TenantViewOptions.withDefaults(); + tenantViewOptions.setTableProps(String.format("VIEW_TTL=%d", viewTTL)); + + // Define the test schema. + final SchemaBuilder schemaBuilder = new SchemaBuilder(getUrl()); + schemaBuilder.withTableOptions(tableOptions).withTenantViewOptions(tenantViewOptions) + .build(); + + // Define the test data. + DataSupplier dataSupplier = new DataSupplier() { + + @Override public List<Object> getValues(int rowIndex) { + Random rnd = new Random(); + String zid = String.format("00A0y000%07d", rowIndex); + String col7 = String.format("g%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col8 = String.format("h%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + String col9 = String.format("i%05d", rowIndex + rnd.nextInt(MAX_ROWS)); + return Lists.newArrayList(new Object[] { zid, col7, col8, col9 }); + } + }; + + // Create a test data reader/writer for the above schema. + DataWriter dataWriter = new BasicDataWriter(); + DataReader dataReader = new BasicDataReader(); + + List<String> columns = Lists.newArrayList("ZID", "COL7", "COL8", "COL9"); + List<String> rowKeyColumns = Lists.newArrayList("ZID"); + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + try (Connection writeConnection = DriverManager.getConnection(tenantConnectUrl)) { + writeConnection.setAutoCommit(true); + dataWriter.setConnection(writeConnection); + dataWriter.setDataSupplier(dataSupplier); + dataWriter.setUpsertColumns(columns); + dataWriter.setRowKeyColumns(rowKeyColumns); + dataWriter.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + dataReader.setValidationColumns(columns); + dataReader.setRowKeyColumns(rowKeyColumns); + dataReader.setDML(String.format("SELECT %s from %s", Joiner.on(",").join(columns), + schemaBuilder.getEntityTenantViewName())); + dataReader.setTargetEntity(schemaBuilder.getEntityTenantViewName()); + + // Validate data before and after ttl expiration. + upsertDataAndRunValidations(viewTTL, DEFAULT_NUM_ROWS, dataWriter, + dataReader, schemaBuilder); + } + + long scnTimestamp = System.currentTimeMillis()+viewTTL; + // Delete data by simulating expiration. + deleteData(schemaBuilder, scnTimestamp); + + // Verify after deleting TTL expired data. + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp)); + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> fetchedData = + fetchData(dataReader); + assertTrue("Expired rows should not be fetched", + fetchedData.rowKeySet().size() == 0); + } + } + + private void upsertDataAndRunValidations(long viewTTL, int numRowsToUpsert, + DataWriter dataWriter, DataReader dataReader, + SchemaBuilder schemaBuilder) + throws IOException, SQLException { + + //Insert for the first time and validate them. + validateExpiredRowsAreNotReturnedUsingData(viewTTL, upsertData(dataWriter, numRowsToUpsert), + dataReader, + schemaBuilder); + + // Update the above rows and validate the same. + validateExpiredRowsAreNotReturnedUsingData(viewTTL, upsertData(dataWriter, numRowsToUpsert), + dataReader, + schemaBuilder); + + } + + private void validateExpiredRowsAreNotReturnedUsingCounts( + long viewTTL, + DataReader dataReader, + SchemaBuilder schemaBuilder) throws SQLException { + + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + + // Verify before TTL expiration + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", + fetchedData != null); + assertTrue("Rows should exists before expiration", + fetchedData.rowKeySet().size() > 0); + } + + // Verify after TTL expiration + long scnTimestamp = System.currentTimeMillis(); + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp+(2*viewTTL))); + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", + fetchedData != null); + assertTrue("Expired rows should not be fetched", + fetchedData.rowKeySet().size() == 0); + } + } + + private void validateExpiredRowsAreNotReturnedUsingData( + long viewTTL, + com.google.common.collect.Table<String, String, Object> upsertedData, + DataReader dataReader, + SchemaBuilder schemaBuilder) throws SQLException { + + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + + // Verify before TTL expiration + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> fetchedData = + fetchData(dataReader); + assertTrue("Upserted data should not be null", + upsertedData != null); + assertTrue("Fetched data should not be null", + fetchedData != null); + + verifyRowsBeforeTTLExpiration(upsertedData, fetchedData); + } + + // Verify after TTL expiration + long scnTimestamp = System.currentTimeMillis(); + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp+(2*viewTTL))); + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", + fetchedData != null); + assertTrue("Expired rows should not be fetched", + fetchedData.rowKeySet().size() == 0); + } + + } + + private void validateRowsAreNotMaskedUsingCounts( + long probeTimestamp, + DataReader dataReader, + SchemaBuilder schemaBuilder) throws SQLException { + + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + + // Verify rows exists (not masked) at current time + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", + fetchedData != null); + assertTrue("Rows should exists before ttl expiration (now)", + fetchedData.rowKeySet().size() > 0); + } + + // Verify rows exists (not masked) at probed timestamp + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(probeTimestamp)); + try (Connection readConnection = DriverManager + .getConnection(tenantConnectUrl, props)) { + + dataReader.setConnection(readConnection); + com.google.common.collect.Table<String, String, Object> fetchedData = + fetchData(dataReader); + assertTrue("Fetched data should not be null", + fetchedData != null); + assertTrue("Rows should exists before ttl expiration (probe-timestamp)", + fetchedData.rowKeySet().size() > 0); + } + } + + private void verifyRowsBeforeTTLExpiration( + com.google.common.collect.Table<String, String, Object> upsertedData, + com.google.common.collect.Table<String, String, Object> fetchedData) { + + Set<String> upsertedRowKeys = upsertedData.rowKeySet(); + Set<String> fetchedRowKeys = fetchedData.rowKeySet(); + assertTrue("Upserted row keys should not be null", + upsertedRowKeys != null); + assertTrue("Fetched row keys should not be null", + fetchedRowKeys != null); + assertTrue("Rows upserted and fetched do not match", + upsertedRowKeys.equals(fetchedRowKeys)); + + + Set<String> fetchedCols = fetchedData.columnKeySet(); + for (String rowKey : fetchedRowKeys) { + for (String columnKey : fetchedCols) { + Object upsertedValue = upsertedData.get(rowKey, columnKey); + Object fetchedValue = fetchedData.get(rowKey, columnKey); + assertTrue("Upserted values should not be null", + upsertedValue != null); + assertTrue("Fetched values should not be null", + fetchedValue != null); + assertTrue("Values upserted and fetched do not match", + upsertedValue.equals(fetchedValue)); + } + } + } + + private com.google.common.collect.Table<String, String, Object> upsertData( + DataWriter dataWriter, int numRowsToUpsert) throws SQLException { + // Upsert rows + dataWriter.upsertRows(numRowsToUpsert); + return dataWriter.getDataTable(); + } + + private com.google.common.collect.Table<String, String, Object> fetchData( + DataReader dataReader) throws SQLException { + + dataReader.readRows(); + return dataReader.getDataTable(); + } + + private void deleteData(SchemaBuilder schemaBuilder, long scnTimestamp) throws SQLException { + + String viewName = schemaBuilder.getEntityTenantViewName(); + + Properties props = new Properties(); + props.setProperty("CurrentSCN", Long.toString(scnTimestamp)); + String + tenantConnectUrl = + getUrl() + ';' + TENANT_ID_ATTRIB + '=' + schemaBuilder.getDataOptions() + .getTenantId(); + + try (Connection deleteConnection = DriverManager.getConnection(tenantConnectUrl, props); + final Statement statement = deleteConnection.createStatement()) { + deleteConnection.setAutoCommit(true); + + final String deleteIfExpiredStatement = String.format("select * from %s", viewName); + Preconditions.checkNotNull(deleteIfExpiredStatement); + + final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); + // Optimize the query plan so that we potentially use secondary indexes + final QueryPlan queryPlan = pstmt.optimizeQuery(deleteIfExpiredStatement); + final Scan scan = queryPlan.getContext().getScan(); + + + PTable table = PhoenixRuntime.getTable(deleteConnection, schemaBuilder.getDataOptions().getTenantId(), viewName); + + byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table); + byte[] + emptyColumnName = + table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ? + QueryConstants.EMPTY_COLUMN_BYTES : + table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME); + + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName); + scan.setAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED, PDataType.TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED, PDataType.FALSE_BYTES); + scan.setAttribute(BaseScannerRegionObserver.VIEW_TTL, Bytes.toBytes(Long.valueOf(table.getViewTTL()))); + PhoenixResultSet rs = pstmt.newResultSet(queryPlan.iterator(), queryPlan.getProjector(), queryPlan.getContext()); + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java index 4392e23..a37b068 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java @@ -35,11 +35,10 @@ import org.apache.phoenix.schema.*; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; -import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; -import static org.apache.phoenix.util.IndexUtil.addEmptyColumnToScan; +import static org.apache.phoenix.util.ScanUtil.addEmptyColumnToScan; /** diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 6e0a1e4..9a96bbe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -62,7 +62,6 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TransactionUtil; - abstract public class BaseScannerRegionObserver extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(BaseScannerRegionObserver.class); @@ -110,6 +109,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String RUN_UPDATE_STATS_ASYNC_ATTRIB = "_RunUpdateStatsAsync"; public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK"; public static final String TX_SCN = "_TxScn"; + public static final String VIEW_TTL = "_ViewTTL"; + public static final String MASK_VIEW_TTL_EXPIRED = "_MASK_TTL_EXPIRED"; + public static final String DELETE_VIEW_TTL_EXPIRED = "_DELETE_TTL_EXPIRED"; public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow"; public static final String REPLAY_WRITES = "_IGNORE_NEWER_MUTATIONS"; public final static String SCAN_OFFSET = "_RowOffset"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java new file mode 100644 index 0000000..e133b10 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLAwareRegionObserver.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.filter.ColumnProjectionFilter; +import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; +import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ScanUtil; +import org.apache.phoenix.util.ServerUtil; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; + +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME; +import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME; + +/** + * + * Coprocessor that checks whether the row is expired based on the TTL spec. + * + */ +public class TTLAwareRegionObserver extends BaseRegionObserver { + private static final Log LOG = LogFactory.getLog(TTLAwareRegionObserver.class); + + /** + * A region scanner that checks the TTL expiration of rows + */ + private static class TTLAwareRegionScanner implements RegionScanner { + RegionScanner scanner; + private Scan scan; + private byte[] emptyCF; + private byte[] emptyCQ; + private Region region; + private long minTimestamp; + private long maxTimestamp; + private long now; + private boolean deleteIfExpired; + private boolean maskIfExpired; + + public TTLAwareRegionScanner(RegionCoprocessorEnvironment env, + Scan scan, + RegionScanner scanner) throws IOException { + this.scan = scan; + this.scanner = scanner; + + deleteIfExpired = ScanUtil.isDeleteTTLExpiredRows(scan) ? true : false; + maskIfExpired = !deleteIfExpired && ScanUtil.isMaskTTLExpiredRows(scan) ? true : false;; + + region = env.getRegion(); + emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME); + emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME); + byte[] txnScn = scan.getAttribute(BaseScannerRegionObserver.TX_SCN); + if (txnScn!=null) { + TimeRange timeRange = scan.getTimeRange(); + scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn)); + } + minTimestamp = scan.getTimeRange().getMin(); + maxTimestamp = scan.getTimeRange().getMax(); + now = maxTimestamp != HConstants.LATEST_TIMESTAMP ? maxTimestamp : EnvironmentEdgeManager.currentTimeMillis(); + } + + @Override + public int getBatch() { + return scanner.getBatch(); + } + + @Override + public long getMaxResultSize() { + return scanner.getMaxResultSize(); + } + + @Override + public boolean next(List<Cell> result) throws IOException { + try { + boolean hasMore; + do { + hasMore = scanner.next(result); + if (result.isEmpty()) { + break; + } + + if (maskIfExpired && checkRowNotExpired(result)) { + break; + } + + if (deleteIfExpired && deleteRowIfExpired(result)) { + break; + } + // skip this row + // 1. if the row has expired (checkRowNotExpired returned false) + // 2. if the row was not deleted (deleteRowIfExpired returned false and + // do not want it to count towards the deleted count) + result.clear(); + } while (hasMore); + return hasMore; + } catch (Throwable t) { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); + return false; // impossible + } + } + + @Override + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + throw new IOException("next with scannerContext should not be called in Phoenix environment"); + } + + @Override + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { + throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment"); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + + @Override + public HRegionInfo getRegionInfo() { + return scanner.getRegionInfo(); + } + + @Override + public boolean isFilterDone() throws IOException { + return scanner.isFilterDone(); + } + + @Override + public boolean reseek(byte[] row) throws IOException { + return scanner.reseek(row); + } + + @Override + public long getMvccReadPoint() { + return scanner.getMvccReadPoint(); + } + + @Override + public boolean nextRaw(List<Cell> result) throws IOException { + try { + boolean hasMore; + do { + hasMore = scanner.nextRaw(result); + if (result.isEmpty()) { + break; + } + if (maskIfExpired && checkRowNotExpired(result)) { + break; + } + + if (deleteIfExpired && deleteRowIfExpired(result)) { + break; + } + // skip this row + // 1. if the row has expired (checkRowNotExpired returned false) + // 2. if the row was not deleted (deleteRowIfExpired returned false and + // do not want it to count towards the deleted count) + result.clear(); + } while (hasMore); + return hasMore; + } catch (Throwable t) { + ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); + return false; // impossible + } + } + + /** + * @param cellList is an input and output parameter and will either include a valid row or be an empty list + * @return true if row expired and deleted or empty, otherwise false + * @throws IOException + */ + private boolean deleteRowIfExpired(List<Cell> cellList) throws IOException { + + long cellListSize = cellList.size(); + if (cellListSize == 0) { + return true; + } + + Iterator<Cell> cellIterator = cellList.iterator(); + Cell firstCell = cellIterator.next(); + byte[] rowKey = new byte[firstCell.getRowLength()]; + System.arraycopy(firstCell.getRowArray(), firstCell.getRowOffset(), rowKey, 0, firstCell.getRowLength()); + + boolean isRowExpired = !checkRowNotExpired(cellList); + if (isRowExpired) { + long ttl = ScanUtil.getViewTTL(this.scan) ; + long ts = getMaxTimestamp(cellList); + LOG.debug(String.format("***** VIEW-TTL: Deleting region = %s, row = %s, delete-ts = %d, max-ts = %d ****** ", + region.getRegionInfo().getTable().getNameAsString(), + Bytes.toString(rowKey), + now-ttl, ts)); + Delete del = new Delete(rowKey, now-ttl); + Mutation[] mutations = new Mutation[]{del}; + region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); + return true; + } + return false; + } + + + private boolean isEmptyColumn(Cell cell) { + return Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + emptyCF, 0, emptyCF.length) == 0 && + Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + emptyCQ, 0, emptyCQ.length) == 0; + } + + // TODO : Remove it after we verify all SQLs include the empty column. + // Before we added ScanUtil.addEmptyColumnToScan some queries like select count(*) did not include + // the empty column in scan, thus this method was the fallback in those cases. + private boolean checkEmptyColumnNotExpired(byte[] rowKey) throws IOException { + LOG.warn("Scan " + scan + " did not return the empty column for " + region.getRegionInfo().getTable().getNameAsString()); + Get get = new Get(rowKey); + get.setTimeRange(minTimestamp, maxTimestamp); + get.addColumn(emptyCF, emptyCQ); + Result result = region.get(get); + if (result.isEmpty()) { + LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString()); + return false; + } + return !isTTLExpired(result.getColumnLatestCell(emptyCF, emptyCQ)); + } + + /** + * @param cellList is an input and output parameter and will either include a valid row or be an empty list + * @return true if row not expired, otherwise false + * @throws IOException + */ + private boolean checkRowNotExpired(List<Cell> cellList) throws IOException { + long cellListSize = cellList.size(); + Cell cell = null; + if (cellListSize == 0) { + return true; + } + Iterator<Cell> cellIterator = cellList.iterator(); + while (cellIterator.hasNext()) { + cell = cellIterator.next(); + if (isEmptyColumn(cell)) { + LOG.debug(String.format("**********VIEW-TTL: Row expired for [%s], expired = %s ***************", cell.toString(), isTTLExpired(cell))); + // Empty column is not supposed to be returned to the client except it is the only column included + // in the scan + if (cellListSize > 1) { + cellIterator.remove(); + } + return !isTTLExpired(cell); + } + } + byte[] rowKey = new byte[cell.getRowLength()]; + System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength()); + return checkEmptyColumnNotExpired(rowKey); + } + + private long getMaxTimestamp(List<Cell> cellList) { + long maxTs = 0; + long ts = 0; + Iterator<Cell> cellIterator = cellList.iterator(); + while (cellIterator.hasNext()) { + Cell cell = cellIterator.next(); + ts = cell.getTimestamp(); + if (ts > maxTs) { + maxTs = ts; + } + } + return maxTs; + } + + private boolean isTTLExpired(Cell cell) { + long ts = cell.getTimestamp(); + long ttl = ScanUtil.getViewTTL(this.scan) ; + if (ts + ttl < now) { + return true; + } + return false; + } + } + + + + @Override + public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, + Scan scan, RegionScanner s) throws IOException { + + if (!ScanUtil.isMaskTTLExpiredRows(scan) && !ScanUtil.isDeleteTTLExpiredRows(scan)) { + return s; + } + + LOG.debug(String.format( + "********** VIEW-TTL: TTLAwareRegionObserver::postScannerOpen TTL for table = [%s], scan = [%s], VIEW_TTL = %d ***************", + s.getRegionInfo().getTable().getNameAsString(), + scan.toJSON(Integer.MAX_VALUE), + ScanUtil.getViewTTL(scan))); + return new TTLAwareRegionScanner(c.getEnvironment(), scan, s); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 69bf8c1..68779aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -133,7 +133,8 @@ public class TableResultIterator implements ResultIterator { this.caches = caches; this.retry=plan.getContext().getConnection().getQueryServices().getProps() .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); - IndexUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection()); + ScanUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection()); + ScanUtil.setScanAttributesForViewTTL(scan, table, plan.getContext().getConnection()); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 4f79097..382c3a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -158,6 +158,7 @@ import org.apache.phoenix.coprocessor.MetaDataRegionObserver; import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.coprocessor.SequenceRegionObserver; import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl; +import org.apache.phoenix.coprocessor.TTLAwareRegionObserver; import org.apache.phoenix.coprocessor.TaskRegionObserver; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.generated.ChildLinkMetaDataProtos.CreateViewAddChildLinkRequest; @@ -1091,6 +1092,15 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } } + + // The priority for this co-processor should be set higher than the GlobalIndexChecker so that the read repair scans + // are intercepted by the TTLAwareRegionObserver and only the rows that are not ttl-expired are returned. + if (!SchemaUtil.isSystemTable(tableName)) { + if (!descriptor.hasCoprocessor(TTLAwareRegionObserver.class.getName())) { + descriptor.addCoprocessor(TTLAwareRegionObserver.class.getName(), null, priority-2, null); + } + } + } catch (IOException e) { throw ServerUtil.parseServerException(e); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index bbc70fc..aa989fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -21,11 +21,9 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERS import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; -import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME; import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER; -import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import static org.apache.phoenix.util.PhoenixRuntime.getTable; import java.io.ByteArrayInputStream; @@ -39,7 +37,6 @@ import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.Map; -import java.util.NavigableSet; import java.util.concurrent.TimeUnit; import com.google.common.cache.Cache; @@ -60,8 +57,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -83,7 +78,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; @@ -91,9 +85,6 @@ import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor; -import org.apache.phoenix.filter.ColumnProjectionFilter; -import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; -import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -101,7 +92,6 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.index.IndexMaintainer; -import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; @@ -141,6 +131,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import com.google.common.collect.Lists; public class IndexUtil { + public static final String INDEX_COLUMN_NAME_SEP = ":"; public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP); @@ -919,93 +910,4 @@ public class IndexUtil { throw new IOException(e); } } - - private static boolean containsOneOrMoreColumn(Scan scan) { - Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); - if (familyMap == null || familyMap.isEmpty()) { - return false; - } - for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { - NavigableSet<byte[]> family = entry.getValue(); - if (family != null && !family.isEmpty()) { - return true; - } - } - return false; - } - - public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) { - boolean addedEmptyColumn = false; - Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan); - while (iterator.hasNext()) { - Filter filter = iterator.next(); - if (filter instanceof EncodedQualifiersColumnProjectionFilter) { - ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME); - if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { - scan.addColumn(emptyCF, emptyCQ); - } - } - else if (filter instanceof ColumnProjectionFilter) { - ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), new ImmutableBytesPtr(emptyCQ)); - if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { - scan.addColumn(emptyCF, emptyCQ); - } - } - else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) { - ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME); - } - else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) { - scan.addColumn(emptyCF, emptyCQ); - addedEmptyColumn = true; - } - } - if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { - scan.addColumn(emptyCF, emptyCQ); - } - } - - public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, PhoenixConnection phoenixConnection) throws SQLException { - if (table.isTransactional() || table.getType() != PTableType.INDEX) { - return; - } - PTable indexTable = table; - if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) { - return; - } - String schemaName = indexTable.getParentSchemaName().getString(); - String tableName = indexTable.getParentTableName().getString(); - PTable dataTable; - try { - dataTable = PhoenixRuntime.getTable(phoenixConnection, SchemaUtil.getTableName(schemaName, tableName)); - } catch (TableNotFoundException e) { - // This index table must be being deleted. No need to set the scan attributes - return; - } - // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child - // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work - if (indexTable.getViewIndexId() != null && indexTable.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) { - int lastIndexOf = indexTable.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR); - String indexName = indexTable.getName().getString().substring(lastIndexOf + 1); - indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName); - } - if (!dataTable.getIndexes().contains(indexTable)) { - return; - } - if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) { - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), phoenixConnection); - scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); - } - scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES); - scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, dataTable.getPhysicalName().getBytes()); - IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection); - byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); - byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); - scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF); - scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ); - if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) { - BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable); - } - addEmptyColumnToScan(scan, emptyCF, emptyCQ); - } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 28ea349..1899ebc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -23,6 +23,8 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_AN import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; +import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME; +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; import java.io.IOException; import java.sql.SQLException; @@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; @@ -53,13 +56,19 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.DescVarLengthFastByteComparisons; import org.apache.phoenix.filter.BooleanExpressionFilter; +import org.apache.phoenix.filter.ColumnProjectionFilter; import org.apache.phoenix.filter.DistinctPrefixFilter; +import org.apache.phoenix.filter.EncodedQualifiersColumnProjectionFilter; import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; import org.apache.phoenix.query.QueryConstants; @@ -70,8 +79,10 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarbinary; @@ -960,4 +971,140 @@ public class ScanUtil { public static void setClientVersion(Scan scan, int version) { scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(version)); } + + public static long getViewTTL(Scan scan) { + byte[] viewTTL = scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL); + if (viewTTL == null) { + return 0L; + } + return Bytes.toLong(viewTTL); + } + + public static boolean isMaskTTLExpiredRows(Scan scan) { + return scan.getAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED) != null && + (Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED), + PDataType.TRUE_BYTES) == 0) + && scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL) != null; + } + + public static boolean isDeleteTTLExpiredRows(Scan scan) { + return scan.getAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED) != null && ( + Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.DELETE_VIEW_TTL_EXPIRED), + PDataType.TRUE_BYTES) == 0) + && scan.getAttribute(BaseScannerRegionObserver.VIEW_TTL) != null; + } + + public static void addEmptyColumnToScan(Scan scan, byte[] emptyCF, byte[] emptyCQ) { + boolean addedEmptyColumn = false; + Iterator<Filter> iterator = ScanUtil.getFilterIterator(scan); + while (iterator.hasNext()) { + Filter filter = iterator.next(); + if (filter instanceof EncodedQualifiersColumnProjectionFilter) { + ((EncodedQualifiersColumnProjectionFilter) filter).addTrackedColumn(ENCODED_EMPTY_COLUMN_NAME); + if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { + scan.addColumn(emptyCF, emptyCQ); + } + } else if (filter instanceof ColumnProjectionFilter) { + ((ColumnProjectionFilter) filter).addTrackedColumn(new ImmutableBytesPtr(emptyCF), + new ImmutableBytesPtr(emptyCQ)); + if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { + scan.addColumn(emptyCF, emptyCQ); + } + } else if (filter instanceof MultiEncodedCQKeyValueComparisonFilter) { + ((MultiEncodedCQKeyValueComparisonFilter) filter).setMinQualifier(ENCODED_EMPTY_COLUMN_NAME); + } else if (!addedEmptyColumn && filter instanceof FirstKeyOnlyFilter) { + scan.addColumn(emptyCF, emptyCQ); + addedEmptyColumn = true; + } + } + if (!addedEmptyColumn && containsOneOrMoreColumn(scan)) { + scan.addColumn(emptyCF, emptyCQ); + } + } + + private static boolean containsOneOrMoreColumn(Scan scan) { + Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); + if (familyMap == null || familyMap.isEmpty()) { + return false; + } + for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { + NavigableSet<byte[]> family = entry.getValue(); + if (family != null && !family.isEmpty()) { + return true; + } + } + return false; + } + + public static void setScanAttributesForIndexReadRepair(Scan scan, PTable table, + PhoenixConnection phoenixConnection) throws SQLException { + if (table.isTransactional() || table.getType() != PTableType.INDEX) { + return; + } + + PTable indexTable = table; + if (indexTable.getIndexType() != PTable.IndexType.GLOBAL) { + return; + } + String schemaName = indexTable.getParentSchemaName().getString(); + String tableName = indexTable.getParentTableName().getString(); + PTable dataTable; + try { + dataTable = + PhoenixRuntime.getTable(phoenixConnection, + SchemaUtil.getTableName(schemaName, tableName)); + } catch (TableNotFoundException e) { + // This index table must be being deleted. No need to set the scan attributes + return; + } + // MetaDataClient modifies the index table name for view indexes if the parent view of an index has a child + // view. This, we need to recreate a PTable object with the correct table name for the rest of this code to work + if (table.getName().getString().contains(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR)) { + int lastIndexOf = table.getName().getString().lastIndexOf(QueryConstants.CHILD_VIEW_INDEX_NAME_SEPARATOR); + String indexName = table.getName().getString().substring(lastIndexOf + 1); + indexTable = PhoenixRuntime.getTable(phoenixConnection, indexName); + } + + if (!dataTable.getIndexes().contains(indexTable)) { + return; + } + if (scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD) == null) { + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(indexTable), + phoenixConnection); + scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + } + scan.setAttribute(BaseScannerRegionObserver.CHECK_VERIFY_COLUMN, TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, + dataTable.getPhysicalName().getBytes()); + IndexMaintainer indexMaintainer = indexTable.getIndexMaintainer(dataTable, phoenixConnection); + byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(); + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyCF); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyCQ); + if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) { + BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable); + } + addEmptyColumnToScan(scan, emptyCF, emptyCQ); + } + + public static void setScanAttributesForViewTTL(Scan scan, PTable table, + PhoenixConnection phoenixConnection) throws SQLException { + + if (table.getViewTTL() != 0) { + byte[] emptyColumnFamilyName = SchemaUtil.getEmptyColumnFamily(table); + byte[] + emptyColumnName = + table.getEncodingScheme() == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ? + QueryConstants.EMPTY_COLUMN_BYTES : + table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME); + + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName); + scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName); + scan.setAttribute(BaseScannerRegionObserver.MASK_VIEW_TTL_EXPIRED, PDataType.TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserver.VIEW_TTL, Bytes.toBytes(Long.valueOf(table.getViewTTL()))); + addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName); + } + + } } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java index ccc378f..8158df9 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/PhoenixTestBuilder.java @@ -20,7 +20,11 @@ package org.apache.phoenix.query; import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.collect.Table; +import com.google.common.collect.TreeBasedTable; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.util.PropertiesUtil; @@ -31,9 +35,12 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; @@ -189,6 +196,41 @@ public class PhoenixTestBuilder { List<Object> getValues(int rowIndex); } + // A Data Reader to be used in tests to read test data from test db. + public interface DataReader { + // returns the columns that need to be projected during DML queries, + List<String> getValidationColumns(); + + void setValidationColumns(List<String> validationColumns); + + // returns the columns that represent the pk/unique key for this data set, + List<String> getRowKeyColumns(); + + void setRowKeyColumns(List<String> rowKeyColumns); + + // returns the connection to be used for DML queries. + Connection getConnection(); + + void setConnection(Connection connection); + + // returns the target entity - whether to use the table, global-view, the tenant-view or + // an index table. + String getTargetEntity(); + + void setTargetEntity(String targetEntity); + + // Build the DML statement and return the SQL string. + String getDML(); + + String setDML(String dmlStatement); + + // template method to read a batch of rows using the above sql. + void readRows() throws SQLException; + + // Get the data that was read as a Table. + Table<String, String, Object> getDataTable(); + } + // A Data Writer to be used in tests to upsert sample data (@see TestDataSupplier) into the sample schema. public interface DataWriter { // returns the columns that need to be upserted, @@ -212,66 +254,142 @@ public class PhoenixTestBuilder { void setTargetEntity(String targetEntity); + // returns the columns that is set asthe pk/unique key for this data set, + List<String> getRowKeyColumns(); + + void setRowKeyColumns(List<String> rowKeyColumns); + // return the data provider for this writer DataSupplier getTestDataSupplier(); - // template method to upsert rows using the above info. - void upsertRow(int rowIndex) throws SQLException; - void setDataSupplier(DataSupplier dataSupplier); + + // template method to upsert a single row using the above info. + List<Object> upsertRow(int rowIndex) throws SQLException; + + // template method to upsert a batch of rows using the above info. + void upsertRows(int numRows) throws SQLException; + + // Get the data that was written as a Table + Table<String, String, Object> getDataTable(); } - /** - * Test SchemaBuilder defaults. - */ - public static class DDLDefaults { - public static final int MAX_ROWS = 10000; - public static List<String> TABLE_PK_TYPES = asList("CHAR(15)", "CHAR(3)"); - public static List<String> GLOBAL_VIEW_PK_TYPES = asList("CHAR(15)"); - public static List<String> TENANT_VIEW_PK_TYPES = asList("CHAR(15)"); + // Provides template method for returning result set + public static abstract class AbstractDataReader implements DataReader { + Table<String, String, Object> dataTable = TreeBasedTable.create(); - public static List<String> COLUMN_TYPES = asList("VARCHAR", "VARCHAR", "VARCHAR"); - public static List<String> TABLE_COLUMNS = asList("COL1", "COL2", "COL3"); - public static List<String> GLOBAL_VIEW_COLUMNS = asList("COL4", "COL5", "COL6"); - public static List<String> TENANT_VIEW_COLUMNS = asList("COL7", "COL8", "COL9"); + public Table<String, String, Object> getDataTable() { + return dataTable; + } - public static List<String> TABLE_COLUMN_FAMILIES = asList(null, null, null); - public static List<String> GLOBAL_VIEW_COLUMN_FAMILIES = asList(null, null, null); - public static List<String> TENANT_VIEW_COLUMN_FAMILIES = asList(null, null, null); + // Read batch of rows + public void readRows() throws SQLException { + dataTable.clear(); + String sql = getDML(); + Connection connection = getConnection(); + try (Statement stmt = connection.createStatement()) { + + final PhoenixStatement pstmt = stmt.unwrap(PhoenixStatement.class); + ResultSet rs = pstmt.executeQuery(sql); + List<String> cols = getValidationColumns(); + List<Object> values = Lists.newArrayList(); + Set<String> rowKeys = getRowKeyColumns() == null || getRowKeyColumns().isEmpty() ? + Sets.<String>newHashSet() : + Sets.newHashSet(getRowKeyColumns()); + List<String> rowKeyParts = Lists.newArrayList(); + while (rs.next()) { + for (String col : cols) { + Object val = rs.getObject(col); + values.add(val); + if (rowKeys.isEmpty()) { + rowKeyParts.add(val.toString()); + } + else if (rowKeys.contains(col)) { + rowKeyParts.add(val.toString()); + } + } - public static List<String> TABLE_PK_COLUMNS = asList("OID", "KP"); - public static List<String> GLOBAL_VIEW_PK_COLUMNS = asList("ID"); - public static List<String> TENANT_VIEW_PK_COLUMNS = asList("ZID"); + String rowKey = Joiner.on("-").join(rowKeyParts); + for (int v = 0; v < values.size(); v++) { + dataTable.put(rowKey,cols.get(v), values.get(v)); + } + values.clear(); + rowKeyParts.clear(); + } + LOGGER.info(String.format("########## rows: %d", dataTable.rowKeySet().size())); - public static List<String> TABLE_INDEX_COLUMNS = asList("COL1"); - public static List<String> TABLE_INCLUDE_COLUMNS = asList("COL3"); + } catch (SQLException e) { + LOGGER.error(String.format(" Error [%s] initializing Reader. ", + e.getMessage())); + throw e; + } + } + } - public static List<String> GLOBAL_VIEW_INDEX_COLUMNS = asList("COL4"); - public static List<String> GLOBAL_VIEW_INCLUDE_COLUMNS = asList("COL6"); + // An implementation of the DataReader. + public static class BasicDataReader extends AbstractDataReader { - public static List<String> TENANT_VIEW_INDEX_COLUMNS = asList("COL9"); - public static List<String> TENANT_VIEW_INCLUDE_COLUMNS = asList("COL7"); + Connection connection; + String targetEntity; + String dmlStatement; + List<String> validationColumns; + List<String> rowKeyColumns; - public static String - DEFAULT_TABLE_PROPS = - "COLUMN_ENCODED_BYTES=0, MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'"; - public static String DEFAULT_TABLE_INDEX_PROPS = ""; - public static String DEFAULT_GLOBAL_VIEW_PROPS = ""; - public static String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = ""; - public static String DEFAULT_TENANT_VIEW_PROPS = ""; - public static String DEFAULT_TENANT_VIEW_INDEX_PROPS = ""; - public static String DEFAULT_KP = "0EC"; - public static String DEFAULT_SCHEMA_NAME = "TEST_ENTITY"; - public static String DEFAULT_TENANT_ID_FMT = "00D0t%03d%s"; - public static String DEFAULT_CONNECT_URL = "jdbc:phoenix:localhost"; + @Override public String getDML() { + return this.dmlStatement; + } + @Override public String setDML(String dmlStatement) { + return this.dmlStatement = dmlStatement; + } + + // returns the columns that need to be projected during DML queries, + @Override public List<String> getValidationColumns() { + return this.validationColumns; + } + + @Override public void setValidationColumns(List<String> validationColumns) { + this.validationColumns = validationColumns; + } + + // returns the columns that is set as the pk/unique key for this data set, + @Override public List<String> getRowKeyColumns() { + return this.rowKeyColumns; + } + + @Override public void setRowKeyColumns(List<String> rowKeyColumns) { + this.rowKeyColumns = rowKeyColumns; + } + + @Override public Connection getConnection() { + return connection; + } + + @Override public void setConnection(Connection connection) { + this.connection = connection; + } + + @Override public String getTargetEntity() { + return targetEntity; + } + + @Override public void setTargetEntity(String targetEntity) { + this.targetEntity = targetEntity; + } } + // Provides template method for upserting rows public static abstract class AbstractDataWriter implements DataWriter { + Table<String, String, Object> dataTable = TreeBasedTable.create(); - public void upsertRow(int rowIndex) throws SQLException { + public Table<String, String, Object> getDataTable() { + return dataTable; + } + + // Upsert one row. + public List<Object> upsertRow(int rowIndex) throws SQLException { List<String> upsertColumns = Lists.newArrayList(); List<Object> upsertValues = Lists.newArrayList(); @@ -294,7 +412,6 @@ public class PhoenixTestBuilder { buf.append("?,"); } buf.setCharAt(buf.length() - 1, ')'); - LOGGER.info(buf.toString()); Connection connection = getConnection(); @@ -305,16 +422,83 @@ public class PhoenixTestBuilder { stmt.execute(); connection.commit(); } + return upsertValues; + } + + // Upsert batch of rows. + public void upsertRows(int numRows) throws SQLException { + dataTable.clear(); + List<String> upsertColumns = Lists.newArrayList(); + List<Integer> rowKeyPositions = Lists.newArrayList(); + + // Figure out the upsert columns based on whether this is a full or partial row update. + boolean isFullRowUpdate = getColumnPositionsToUpdate().isEmpty(); + if (isFullRowUpdate) { + upsertColumns.addAll(getUpsertColumns()); + } else { + List<String> tmpColumns = getUpsertColumns(); + for (int i : getColumnPositionsToUpdate()) { + upsertColumns.add(tmpColumns.get(i)); + } + } + + Set<String> rowKeys = getRowKeyColumns() == null || getRowKeyColumns().isEmpty() ? + Sets.<String>newHashSet(getUpsertColumns()) : + Sets.newHashSet(getRowKeyColumns()); + + StringBuilder buf = new StringBuilder("UPSERT INTO "); + buf.append(getTargetEntity()); + buf.append(" (").append(Joiner.on(",").join(upsertColumns)).append(") VALUES("); + for (int i = 0; i < upsertColumns.size(); i++) { + buf.append("?,"); + if (rowKeys.contains(upsertColumns.get(i))) { + rowKeyPositions.add(i); + } + } + buf.setCharAt(buf.length() - 1, ')'); + LOGGER.info(buf.toString()); + + Connection connection = getConnection(); + try (PreparedStatement stmt = connection.prepareStatement(buf.toString())) { + + for (int r = 1; r <= numRows; r++) { + List<Object> upsertValues = Lists.newArrayList(); + if (isFullRowUpdate) { + upsertValues.addAll(getTestDataSupplier().getValues(r)); + } else { + List<Object> tmpValues = getTestDataSupplier().getValues(r); + for (int c : getColumnPositionsToUpdate()) { + upsertValues.add(tmpValues.get(c)); + } + } + + List<String> rowKeyParts = Lists.newArrayList(); + for (int position : rowKeyPositions) { + rowKeyParts.add(upsertValues.get(position).toString()); + } + String rowKey = Joiner.on("-").join(rowKeyParts); + + for (int v = 0; v < upsertValues.size(); v++) { + stmt.setObject(v + 1, upsertValues.get(v)); + dataTable.put(rowKey,upsertColumns.get(v), upsertValues.get(v)); + } + stmt.execute(); + } + connection.commit(); + } } + } - // An implementation of the TestDataWriter. + // An implementation of the DataWriter. public static class BasicDataWriter extends AbstractDataWriter { List<String> upsertColumns = Lists.newArrayList(); List<Integer> columnPositionsToUpdate = Lists.newArrayList(); DataSupplier dataSupplier; Connection connection; String targetEntity; + List<String> rowKeyColumns; + @Override public List<String> getUpsertColumns() { return upsertColumns; @@ -348,6 +532,15 @@ public class PhoenixTestBuilder { this.targetEntity = targetEntity; } + // returns the columns that is set as the pk/unique key for this data set, + @Override public List<String> getRowKeyColumns() { + return this.rowKeyColumns; + } + + @Override public void setRowKeyColumns(List<String> rowKeyColumns) { + this.rowKeyColumns = rowKeyColumns; + } + @Override public DataSupplier getTestDataSupplier() { return dataSupplier; } @@ -358,6 +551,53 @@ public class PhoenixTestBuilder { } /** + * Test SchemaBuilder defaults. + */ + public static class DDLDefaults { + public static final int MAX_ROWS = 10000; + public static List<String> TABLE_PK_TYPES = asList("CHAR(15)", "CHAR(3)"); + public static List<String> GLOBAL_VIEW_PK_TYPES = asList("CHAR(15)"); + public static List<String> TENANT_VIEW_PK_TYPES = asList("CHAR(15)"); + + public static List<String> COLUMN_TYPES = asList("VARCHAR", "VARCHAR", "VARCHAR"); + public static List<String> TABLE_COLUMNS = asList("COL1", "COL2", "COL3"); + public static List<String> GLOBAL_VIEW_COLUMNS = asList("COL4", "COL5", "COL6"); + public static List<String> TENANT_VIEW_COLUMNS = asList("COL7", "COL8", "COL9"); + + public static List<String> TABLE_COLUMN_FAMILIES = asList(null, null, null); + public static List<String> GLOBAL_VIEW_COLUMN_FAMILIES = asList(null, null, null); + public static List<String> TENANT_VIEW_COLUMN_FAMILIES = asList(null, null, null); + + public static List<String> TABLE_PK_COLUMNS = asList("OID", "KP"); + public static List<String> GLOBAL_VIEW_PK_COLUMNS = asList("ID"); + public static List<String> TENANT_VIEW_PK_COLUMNS = asList("ZID"); + + public static List<String> TABLE_INDEX_COLUMNS = asList("COL1"); + public static List<String> TABLE_INCLUDE_COLUMNS = asList("COL3"); + + public static List<String> GLOBAL_VIEW_INDEX_COLUMNS = asList("COL4"); + public static List<String> GLOBAL_VIEW_INCLUDE_COLUMNS = asList("COL6"); + + public static List<String> TENANT_VIEW_INDEX_COLUMNS = asList("COL9"); + public static List<String> TENANT_VIEW_INCLUDE_COLUMNS = asList("COL7"); + + public static String + DEFAULT_TABLE_PROPS = + "COLUMN_ENCODED_BYTES=0, MULTI_TENANT=true,DEFAULT_COLUMN_FAMILY='Z'"; + public static String DEFAULT_TABLE_INDEX_PROPS = ""; + public static String DEFAULT_GLOBAL_VIEW_PROPS = ""; + public static String DEFAULT_GLOBAL_VIEW_INDEX_PROPS = ""; + public static String DEFAULT_TENANT_VIEW_PROPS = ""; + public static String DEFAULT_TENANT_VIEW_INDEX_PROPS = ""; + public static String DEFAULT_KP = "ECZ"; + public static String DEFAULT_SCHEMA_NAME = "TEST_ENTITY"; + public static String DEFAULT_TENANT_ID_FMT = "00D0t%03d%s"; + + public static String DEFAULT_CONNECT_URL = "jdbc:phoenix:localhost"; + + } + + /** * Schema builder for test writers to prepare various test scenarios. * It can be used to define the following type of schemas - * 1. Simple Table. @@ -741,20 +981,20 @@ public class PhoenixTestBuilder { "useTenantConnectionForGlobalView and useGlobalConnectionOnly both cannot be true"); } - String tableName = SchemaUtil.getEscapedArgument("T_" + dataOptions.uniqueName); - String globalViewName = SchemaUtil.getEscapedArgument("V_" + dataOptions.uniqueName); + String tableName = SchemaUtil.normalizeIdentifier("T_" + dataOptions.uniqueName); + String globalViewName = SchemaUtil.normalizeIdentifier("V_" + dataOptions.uniqueName); String tableSchemaName = - tableEnabled ? SchemaUtil.getEscapedArgument(tableOptions.schemaName) : ""; + tableEnabled ? SchemaUtil.normalizeIdentifier(tableOptions.schemaName) : ""; String globalViewSchemaName = globalViewEnabled ? - SchemaUtil.getEscapedArgument(globalViewOptions.schemaName) : + SchemaUtil.normalizeIdentifier(globalViewOptions.schemaName) : ""; String tenantViewSchemaName = tenantViewEnabled ? - SchemaUtil.getEscapedArgument(tenantViewOptions.schemaName) : + SchemaUtil.normalizeIdentifier(tenantViewOptions.schemaName) : ""; entityTableName = SchemaUtil.getTableName(tableSchemaName, tableName); entityGlobalViewName = SchemaUtil.getTableName(globalViewSchemaName, globalViewName); @@ -767,7 +1007,7 @@ public class PhoenixTestBuilder { (String.format("Z%02d", dataOptions.getViewNumber())) : DDLDefaults.DEFAULT_KP); - String tenantViewName = SchemaUtil.getEscapedArgument(entityKeyPrefix); + String tenantViewName = SchemaUtil.normalizeIdentifier(entityKeyPrefix); entityTenantViewName = SchemaUtil.getTableName(tenantViewSchemaName, tenantViewName); String globalViewCondition = String.format("KP = '%s'", entityKeyPrefix); @@ -787,7 +1027,7 @@ public class PhoenixTestBuilder { if (tableIndexEnabled && !tableIndexCreated) { String indexOnTableName = - SchemaUtil.getEscapedArgument(String.format("IDX_%s", + SchemaUtil.normalizeIdentifier(String.format("IDX_%s", SchemaUtil.normalizeIdentifier(tableName))); globalConnection.createStatement().execute( buildCreateIndexStmt(indexOnTableName, entityTableName,