This is an automated email from the ASF dual-hosted git repository. pboado pushed a commit to branch 5.x-cdh6 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit e62d06126733ccf71c256448dd7e0580e25ce411 Author: Gokcen Iskender <gisken...@salesforce.com> AuthorDate: Fri Feb 15 22:07:01 2019 +0000 PHOENIX-5089 Add tenantId parameter to IndexScrunityTool Signed-off-by: Geoffrey Jacoby <gjac...@apache.org> --- .../phoenix/end2end/IndexScrutinyToolIT.java | 1398 +++++++++++--------- .../mapreduce/index/IndexScrutinyMapper.java | 2 + .../mapreduce/index/IndexScrutinyTableOutput.java | 1 + .../phoenix/mapreduce/index/IndexScrutinyTool.java | 61 +- .../apache/phoenix/mapreduce/index/IndexTool.java | 5 +- .../phoenix/mapreduce/util/IndexColumnNames.java | 3 + 6 files changed, 820 insertions(+), 650 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java index 692a98c..046c3f0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java @@ -46,9 +46,12 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.CsvBulkImportUtil; import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput; import org.apache.phoenix.mapreduce.index.IndexScrutinyTool; @@ -58,6 +61,7 @@ import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters; import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; @@ -70,6 +74,7 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -81,681 +86,850 @@ import com.google.common.collect.Sets; * Tests for the {@link IndexScrutinyTool} */ @Category(NeedsOwnMiniClusterTest.class) -@RunWith(Parameterized.class) -public class IndexScrutinyToolIT extends BaseTest { +@RunWith(Enclosed.class) +public class IndexScrutinyToolIT { + + abstract public static class SharedIndexToolIT extends BaseTest { + protected String outputDir; + + @BeforeClass public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMap(); + //disable major compactions + serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0"); + Map<String, String> clientProps = Maps.newHashMap(); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + } - private String dataTableDdl; - private String indexTableDdl; + protected List<Job> runScrutiny(String[] cmdArgs) throws Exception { + IndexScrutinyTool scrutiny = new IndexScrutinyTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + scrutiny.setConf(conf); + int status = scrutiny.run(cmdArgs); + assertEquals(0, status); + for (Job job : scrutiny.getJobs()) { + assertTrue(job.waitForCompletion(true)); + } + return scrutiny.getJobs(); + } - private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)"; + protected String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, + SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows, String tenantId, Long scrutinyTs) { + final List<String> args = Lists.newArrayList(); + if (schemaName != null) { + args.add("-s"); + args.add(schemaName); + } + args.add("-dt"); + args.add(dataTable); + args.add("-it"); + args.add(indxTable); + + // TODO test snapshot reads + // if(useSnapshot) { + // args.add("-snap"); + // } + + if (OutputFormat.FILE.equals(outputFormat)) { + args.add("-op"); + outputDir = "/tmp/" + UUID.randomUUID().toString(); + args.add(outputDir); + } - private static final String INDEX_UPSERT_SQL = - "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)"; + args.add("-t"); + args.add(String.valueOf(scrutinyTs)); + args.add("-run-foreground"); + if (batchSize != null) { + args.add("-b"); + args.add(String.valueOf(batchSize)); + } - private static final String DELETE_SQL = "DELETE FROM %s "; + // default to using data table as the source table + args.add("-src"); + if (sourceTable == null) { + args.add(SourceTable.DATA_TABLE_SOURCE.name()); + } else { + args.add(sourceTable.name()); + } + if (outputInvalidRows) { + args.add("-o"); + } + if (outputFormat != null) { + args.add("-of"); + args.add(outputFormat.name()); + } + if (maxOutputRows != null) { + args.add("-om"); + args.add(maxOutputRows.toString()); + } + if (tenantId != null) { + args.add("-tenant"); + args.add(tenantId); + } + return args.toArray(new String[0]); + } - private String schemaName; - private String dataTableName; - private String dataTableFullName; - private String indexTableName; - private String indexTableFullName; - private String outputDir; + protected long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) { + return counters.findCounter(counter).getValue(); + } + } - private Connection conn; + @RunWith(Parameterized.class) public static class IndexScrutinyToolNonTenantIT extends SharedIndexToolIT { - private PreparedStatement dataTableUpsertStmt; + private String dataTableDdl; + private String indexTableDdl; - private PreparedStatement indexTableUpsertStmt; + private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)"; - private long testTime; - private Properties props; + private static final String + INDEX_UPSERT_SQL = + "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)"; - @Parameterized.Parameters - public static Collection<Object[]> data() { - return Arrays.asList(new Object[][] { - { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, - { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, - { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } - }); - } + private static final String DELETE_SQL = "DELETE FROM %s "; - public IndexScrutinyToolIT(String dataTableDdl, String indexTableDdl) { - this.dataTableDdl = dataTableDdl; - this.indexTableDdl = indexTableDdl; - } + private String schemaName; + private String dataTableName; + private String dataTableFullName; + private String indexTableName; + private String indexTableFullName; - @BeforeClass - public static void doSetup() throws Exception { - Map<String, String> serverProps = Maps.newHashMap(); - //disable major compactions - serverProps.put(HConstants.MAJOR_COMPACTION_PERIOD, "0"); - Map<String, String> clientProps = Maps.newHashMap(); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); - } + private Connection conn; - /** - * Create the test data and index tables - */ - @Before - public void setup() throws SQLException { - generateUniqueTableNames(); - createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName)); - createTestTable(getUrl(), - String.format(indexTableDdl, indexTableName, dataTableFullName)); - props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - conn = DriverManager.getConnection(getUrl(), props); - String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName); - dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert); - String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName); - indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert); - conn.setAutoCommit(false); - testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000; + private PreparedStatement dataTableUpsertStmt; - } + private PreparedStatement indexTableUpsertStmt; - @After - public void teardown() throws SQLException { - if (conn != null) { - conn.close(); + private long testTime; + private Properties props; + + @Parameterized.Parameters public static Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)", + "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", + "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" }, { "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR) SALT_BUCKETS=2", + "CREATE LOCAL INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)" } }); } - } - /** - * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid. - */ - @Test - public void testValidIndex() throws Exception { - // insert two rows - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - conn.commit(); - - int numDataRows = countRows(dataTableFullName); - int numIndexRows = countRows(indexTableFullName); - - // scrutiny should report everything as ok - List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); - - // make sure row counts weren't modified by scrutiny - assertEquals(numDataRows, countRows(dataTableFullName)); - assertEquals(numIndexRows, countRows(indexTableFullName)); - } + public IndexScrutinyToolNonTenantIT(String dataTableDdl, String indexTableDdl) { + this.dataTableDdl = dataTableDdl; + this.indexTableDdl = indexTableDdl; + } - /** - * Tests running a scrutiny while updates and deletes are happening. - * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue. - */ - @Test - @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS scanner") - public void testScrutinyWhileTakingWrites() throws Exception { - int id = 0; - while (id < 1000) { - int index = 1; - dataTableUpsertStmt.setInt(index++, id); - dataTableUpsertStmt.setString(index++, "name-" + id); - dataTableUpsertStmt.setInt(index++, id); - dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime)); - dataTableUpsertStmt.executeUpdate(); - id++; - } - conn.commit(); - - //CURRENT_SCN for scrutiny - long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis(); - - // launch background upserts and deletes - final Random random = new Random(0); - Runnable backgroundUpserts = new Runnable() { - @Override - public void run() { - int idToUpsert = random.nextInt(1000); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - PreparedStatement dataPS = - conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName)); - upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000); - conn.commit(); - } catch (SQLException e) { - e.printStackTrace(); - } + /** + * Create the test data and index tables + */ + @Before public void setup() throws SQLException { + generateUniqueTableNames(); + createTestTable(getUrl(), String.format(dataTableDdl, dataTableFullName)); + createTestTable(getUrl(), String.format(indexTableDdl, indexTableName, dataTableFullName)); + props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName); + dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert); + String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName); + indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert); + conn.setAutoCommit(false); + testTime = EnvironmentEdgeManager.currentTimeMillis() - 1000; + + } + + @After public void teardown() throws SQLException { + if (conn != null) { + conn.close(); } - }; - Runnable backgroundDeletes = new Runnable() { - @Override - public void run() { - int idToDelete = random.nextInt(1000); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - String deleteSql = - String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" - + idToDelete; - conn.createStatement().executeUpdate(deleteSql); - conn.commit(); - } catch (SQLException e) { - e.printStackTrace(); - } + } + + /** + * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid. + */ + @Test public void testValidIndex() throws Exception { + // insert two rows + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + int numDataRows = countRows(dataTableFullName); + int numIndexRows = countRows(indexTableFullName); + + // scrutiny should report everything as ok + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + + // make sure row counts weren't modified by scrutiny + assertEquals(numDataRows, countRows(dataTableFullName)); + assertEquals(numIndexRows, countRows(indexTableFullName)); + } + + /** + * Tests running a scrutiny while updates and deletes are happening. + * Since CURRENT_SCN is set, the scrutiny shouldn't report any issue. + */ + @Test @Ignore("PHOENIX-4378 Unable to set KEEP_DELETED_CELLS to true on RS scanner") + public void testScrutinyWhileTakingWrites() throws Exception { + int id = 0; + while (id < 1000) { + int index = 1; + dataTableUpsertStmt.setInt(index++, id); + dataTableUpsertStmt.setString(index++, "name-" + id); + dataTableUpsertStmt.setInt(index++, id); + dataTableUpsertStmt.setTimestamp(index++, new Timestamp(testTime)); + dataTableUpsertStmt.executeUpdate(); + id++; } - }; - ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); - scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, - TimeUnit.MILLISECONDS); - scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, - TimeUnit.MILLISECONDS); - - // scrutiny should report everything as ok - List<Job> completedJobs = - runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, - scrutinyTS); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); - scheduledThreadPool.shutdown(); - scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS); - } + conn.commit(); + + //CURRENT_SCN for scrutiny + long scrutinyTS = EnvironmentEdgeManager.currentTimeMillis(); + + // launch background upserts and deletes + final Random random = new Random(0); + Runnable backgroundUpserts = new Runnable() { + @Override public void run() { + int idToUpsert = random.nextInt(1000); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + PreparedStatement dataPS = + conn.prepareStatement(String.format(UPSERT_SQL, dataTableFullName)); + upsertRow(dataPS, idToUpsert, "modified-" + idToUpsert, idToUpsert + 1000); + conn.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + }; + Runnable backgroundDeletes = new Runnable() { + @Override public void run() { + int idToDelete = random.nextInt(1000); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String deleteSql = + String.format(DELETE_SQL, indexTableFullName) + "WHERE \":ID\"=" + + idToDelete; + conn.createStatement().executeUpdate(deleteSql); + conn.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + }; + ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2); + scheduledThreadPool.scheduleWithFixedDelay(backgroundUpserts, 200, 200, TimeUnit.MILLISECONDS); + scheduledThreadPool.scheduleWithFixedDelay(backgroundDeletes, 200, 200, TimeUnit.MILLISECONDS); + + // scrutiny should report everything as ok + List<Job> completedJobs = + runScrutinyCurrentSCN(schemaName, dataTableName, indexTableName, scrutinyTS); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1000, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + scheduledThreadPool.shutdown(); + scheduledThreadPool.awaitTermination(10000, TimeUnit.MILLISECONDS); + } - /** - * Tests an index with the same # of rows as the data table, but one of the index rows is - * incorrect Scrutiny should report the invalid rows. - */ - @Test - public void testEqualRowCountIndexIncorrect() throws Exception { - // insert one valid row - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - conn.commit(); - - // disable the index and insert another row which is not indexed - disableIndex(); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - conn.commit(); - - // insert a bad row into the index - upsertIndexRow("badName", 2, 9999); - conn.commit(); - - // scrutiny should report the bad row - List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); - } + /** + * Tests an index with the same # of rows as the data table, but one of the index rows is + * incorrect Scrutiny should report the invalid rows. + */ + @Test public void testEqualRowCountIndexIncorrect() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad row into the index + upsertIndexRow("badName", 2, 9999); + conn.commit(); + + // scrutiny should report the bad row + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + } - /** - * Tests an index where the index pk is correct (indexed col values are indexed correctly), but - * a covered index value is incorrect. Scrutiny should report the invalid row - */ - @Test - public void testCoveredValueIncorrect() throws Exception { - // insert one valid row - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - conn.commit(); - - // disable index and insert another data row - disableIndex(); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - conn.commit(); - - // insert a bad index row for the above data row - upsertIndexRow("name-2", 2, 9999); - conn.commit(); - - // scrutiny should report the bad row - List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT)); - } + /** + * Tests an index where the index pk is correct (indexed col values are indexed correctly), but + * a covered index value is incorrect. Scrutiny should report the invalid row + */ + @Test public void testCoveredValueIncorrect() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable index and insert another data row + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad index row for the above data row + upsertIndexRow("name-2", 2, 9999); + conn.commit(); + + // scrutiny should report the bad row + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT)); + } - /** - * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs - * scrutiny with batchsize of 10, - */ - @Test - public void testBatching() throws Exception { - // insert 1001 data and index rows - int numTestRows = 1001; - for (int i = 0; i < numTestRows; i++) { - upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000); - } - conn.commit(); - - disableIndex(); - - // randomly delete some rows from the index - Random random = new Random(); - for (int i = 0; i < 100; i++) { - int idToDelete = random.nextInt(numTestRows); - deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete); - } - conn.commit(); - int numRows = countRows(indexTableFullName); - int numDeleted = numTestRows - numRows; - - // run scrutiny with batch size of 10 - List<Job> completedJobs = - runScrutiny(schemaName, dataTableName, indexTableName, 10L); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT)); - assertEquals(numTestRows / 10 + numTestRows % 10, - getCounterValue(counters, BATCHES_PROCESSED_COUNT)); - } + /** + * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs + * scrutiny with batchsize of 10, + */ + @Test public void testBatching() throws Exception { + // insert 1001 data and index rows + int numTestRows = 1001; + for (int i = 0; i < numTestRows; i++) { + upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000); + } + conn.commit(); - /** - * Tests when there are more data table rows than index table rows Scrutiny should report the - * number of incorrect rows - */ - @Test - public void testMoreDataRows() throws Exception { - upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); - conn.commit(); - disableIndex(); - // these rows won't have a corresponding index row - upsertRow(dataTableUpsertStmt, 2, "name-2", 95124); - upsertRow(dataTableUpsertStmt, 3, "name-3", 95125); - conn.commit(); - - List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); - } + disableIndex(); - /** - * Tests when there are more index table rows than data table rows Scrutiny should report the - * number of incorrect rows when run with the index as the source table - */ - @Test - public void testMoreIndexRows() throws Exception { - upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); - conn.commit(); - disableIndex(); - // these index rows won't have a corresponding data row - upsertIndexRow("name-2", 2, 95124); - upsertIndexRow("name-3", 3, 95125); - conn.commit(); - - List<Job> completedJobs = - runScrutiny(schemaName, dataTableName, indexTableName, 10L, - SourceTable.INDEX_TABLE_SOURCE); - Job job = completedJobs.get(0); - assertTrue(job.isSuccessful()); - Counters counters = job.getCounters(); - assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); - } + // randomly delete some rows from the index + Random random = new Random(); + for (int i = 0; i < 100; i++) { + int idToDelete = random.nextInt(numTestRows); + deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete); + } + conn.commit(); + int numRows = countRows(indexTableFullName); + int numDeleted = numTestRows - numRows; - /** - * Tests running with both the index and data tables as the source table If we have an - * incorrectly indexed row, it should be reported in each direction - */ - @Test - public void testBothDataAndIndexAsSource() throws Exception { - // insert one valid row - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - conn.commit(); - - // disable the index and insert another row which is not indexed - disableIndex(); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - conn.commit(); - - // insert a bad row into the index - upsertIndexRow("badName", 2, 9999); - conn.commit(); - - List<Job> completedJobs = - runScrutiny(schemaName, dataTableName, indexTableName, 10L, - SourceTable.BOTH); - assertEquals(2, completedJobs.size()); - for (Job job : completedJobs) { + // run scrutiny with batch size of 10 + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName, 10L); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT)); + assertEquals(numTestRows / 10 + numTestRows % 10, + getCounterValue(counters, BATCHES_PROCESSED_COUNT)); + } + + /** + * Tests when there are more data table rows than index table rows Scrutiny should report the + * number of incorrect rows + */ + @Test public void testMoreDataRows() throws Exception { + upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); + conn.commit(); + disableIndex(); + // these rows won't have a corresponding index row + upsertRow(dataTableUpsertStmt, 2, "name-2", 95124); + upsertRow(dataTableUpsertStmt, 3, "name-3", 95125); + conn.commit(); + + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); assertTrue(job.isSuccessful()); Counters counters = job.getCounters(); assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); - assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); + } + + /** + * Tests when there are more index table rows than data table rows Scrutiny should report the + * number of incorrect rows when run with the index as the source table + */ + @Test public void testMoreIndexRows() throws Exception { + upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); + conn.commit(); + disableIndex(); + // these index rows won't have a corresponding data row + upsertIndexRow("name-2", 2, 95124); + upsertIndexRow("name-3", 3, 95125); + conn.commit(); + + List<Job> completedJobs = + runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); + } + + /** + * Tests running with both the index and data tables as the source table If we have an + * incorrectly indexed row, it should be reported in each direction + */ + @Test public void testBothDataAndIndexAsSource() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad row into the index + upsertIndexRow("badName", 2, 9999); + conn.commit(); + + List<Job> completedJobs = + runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.BOTH); + assertEquals(2, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + } } - } - /** - * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file - */ - @Test - public void testOutputInvalidRowsToFile() throws Exception { - insertOneValid_OneBadVal_OneMissingTarget(); - - String[] argValues = - getArgValues(schemaName, dataTableName, indexTableName, 10L, - SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null); - runScrutiny(argValues); - - // check the output files - Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName); - DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem(); - List<Path> paths = Lists.newArrayList(); - Path firstPart = null; - for (FileStatus outputFile : fs.listStatus(outputPath)) { - if (outputFile.getPath().getName().startsWith("part")) { - if (firstPart == null) { - firstPart = outputFile.getPath(); - } else { - paths.add(outputFile.getPath()); + /** + * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file + */ + @Test public void testOutputInvalidRowsToFile() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + + String[] argValues = + getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null); + runScrutiny(argValues); + + // check the output files + Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName); + DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem(); + List<Path> paths = Lists.newArrayList(); + Path firstPart = null; + for (FileStatus outputFile : fs.listStatus(outputPath)) { + if (outputFile.getPath().getName().startsWith("part")) { + if (firstPart == null) { + firstPart = outputFile.getPath(); + } else { + paths.add(outputFile.getPath()); + } } } + if (dataTableDdl.contains("SALT_BUCKETS")) { + fs.concat(firstPart, paths.toArray(new Path[0])); + } + Path outputFilePath = firstPart; + assertTrue(fs.exists(outputFilePath)); + FSDataInputStream fsDataInputStream = fs.open(outputFilePath); + BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream)); + TreeSet<String> lines = Sets.newTreeSet(); + try { + String line = null; + while ((line = reader.readLine()) != null) { + lines.add(line); + } + } finally { + IOUtils.closeQuietly(reader); + IOUtils.closeQuietly(fsDataInputStream); + } + Iterator<String> lineIterator = lines.iterator(); + assertEquals( + "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", lineIterator.next()); + assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", + lineIterator.next()); + + } + + /** + * Tests writing of results to the output table + */ + @Test public void testOutputInvalidRowsToTable() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + String[] argValues = + getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null); + List<Job> completedJobs = runScrutiny(argValues); + + // check that the output table contains the invalid rows + long scrutinyTimeMillis = + PhoenixConfigurationUtil + .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); + String invalidRowsQuery = + IndexScrutinyTableOutput + .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis); + ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc"); + assertTrue(rs.next()); + assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); + assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); + assertTrue(rs.getBoolean("HAS_TARGET_ROW")); + assertEquals(2, rs.getInt("ID")); + assertEquals(2, rs.getInt(":ID")); + assertEquals(95123, rs.getInt("ZIP")); + assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect + assertTrue(rs.next()); + assertEquals(dataTableFullName, rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); + assertEquals(indexTableFullName, rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); + assertFalse(rs.getBoolean("HAS_TARGET_ROW")); + assertEquals(3, rs.getInt("ID")); + assertEquals(null, rs.getObject(":ID")); // null for missing target row + assertFalse(rs.next()); + + // check that the job results were written correctly to the metadata table + assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery); } - if (dataTableDdl.contains("SALT_BUCKETS")) { - fs.concat(firstPart, paths.toArray(new Path[0])); - } - Path outputFilePath = firstPart; - assertTrue(fs.exists(outputFilePath)); - FSDataInputStream fsDataInputStream = fs.open(outputFilePath); - BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream)); - TreeSet<String> lines = Sets.newTreeSet(); - try { - String line = null; - while ((line = reader.readLine()) != null) { - lines.add(line); + + /** + * Tests that the config for max number of output rows is observed + */ + @Test public void testMaxOutputRows() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + // set max to 1. There are two bad rows, but only 1 should get written to output table + String[] argValues = + getArgValues(schemaName, dataTableName, indexTableName, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1)); + List<Job> completedJobs = runScrutiny(argValues); + long scrutinyTimeMillis = + PhoenixConfigurationUtil + .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); + String invalidRowsQuery = + IndexScrutinyTableOutput + .getSqlQueryAllInvalidRows(conn, getColNames(), scrutinyTimeMillis); + ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery); + assertTrue(rs.next()); + if (dataTableDdl.contains("SALT_BUCKETS")) { + assertTrue(rs.next()); + assertFalse(rs.next()); + } else { + assertFalse(rs.next()); } - } finally { - IOUtils.closeQuietly(reader); - IOUtils.closeQuietly(fsDataInputStream); } - Iterator<String> lineIterator = lines.iterator(); - assertEquals( - "[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime) - .toString() + ", 9999]", lineIterator.next()); - assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", - lineIterator.next()); - } + private SourceTargetColumnNames getColNames() throws SQLException { + PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName); + PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName); + SourceTargetColumnNames columnNames = + new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable); + return columnNames; + } - /** - * Tests writing of results to the output table - */ - @Test - public void testOutputInvalidRowsToTable() throws Exception { - insertOneValid_OneBadVal_OneMissingTarget(); - String[] argValues = - getArgValues(schemaName, dataTableName, indexTableName, 10L, - SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null); - List<Job> completedJobs = runScrutiny(argValues); - - // check that the output table contains the invalid rows - long scrutinyTimeMillis = - PhoenixConfigurationUtil - .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); - String invalidRowsQuery = - IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(), - scrutinyTimeMillis); - ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc"); - assertTrue(rs.next()); - assertEquals(dataTableFullName, - rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); - assertEquals(indexTableFullName, - rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); - assertTrue(rs.getBoolean("HAS_TARGET_ROW")); - assertEquals(2, rs.getInt("ID")); - assertEquals(2, rs.getInt(":ID")); - assertEquals(95123, rs.getInt("ZIP")); - assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect - assertTrue(rs.next()); - assertEquals(dataTableFullName, - rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); - assertEquals(indexTableFullName, - rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); - assertFalse(rs.getBoolean("HAS_TARGET_ROW")); - assertEquals(3, rs.getInt("ID")); - assertEquals(null, rs.getObject(":ID")); // null for missing target row - assertFalse(rs.next()); - - // check that the job results were written correctly to the metadata table - assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery); - } + // inserts one valid data/index row, one data row with a missing index row, + // and one data row with an index row that has a bad covered col val + private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + upsertRow(dataTableUpsertStmt, 3, "name-3", 95123); + conn.commit(); + + // insert a bad index row for one of the above data rows + upsertIndexRow("name-2", 2, 9999); + conn.commit(); + } + + private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, + String invalidRowsQuery) throws SQLException { + ResultSet rs; + ResultSet metadataRs = + IndexScrutinyTableOutput + .queryAllMetadata(conn, dataTableFullName, indexTableFullName, + scrutinyTimeMillis); + assertTrue(metadataRs.next()); + List<? extends Object> + expected = + Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, + SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, + 2L, 1L, 1L, + "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", + "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", + invalidRowsQuery); + if (dataTableDdl.contains("SALT_BUCKETS")) { + expected = + Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, + SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, + 2L, 1L, 2L, + "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", + "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", + invalidRowsQuery); + } - /** - * Tests that the config for max number of output rows is observed - */ - @Test - public void testMaxOutputRows() throws Exception { - insertOneValid_OneBadVal_OneMissingTarget(); - // set max to 1. There are two bad rows, but only 1 should get written to output table - String[] argValues = - getArgValues(schemaName, dataTableName, indexTableName, 10L, - SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1)); - List<Job> completedJobs = runScrutiny(argValues); - long scrutinyTimeMillis = - PhoenixConfigurationUtil - .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); - String invalidRowsQuery = - IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(), - scrutinyTimeMillis); - ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery); - assertTrue(rs.next()); - if (dataTableDdl.contains("SALT_BUCKETS")) { + assertRsValues(metadataRs, expected); + String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET"); + rs = conn.createStatement().executeQuery(missingTargetQuery); assertTrue(rs.next()); + assertEquals(3, rs.getInt("ID")); assertFalse(rs.next()); - } else { + String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL"); + rs = conn.createStatement().executeQuery(badCoveredColQuery); + assertTrue(rs.next()); + assertEquals(2, rs.getInt("ID")); assertFalse(rs.next()); } - } - private SourceTargetColumnNames getColNames() throws SQLException { - PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName); - PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName); - SourceTargetColumnNames columnNames = - new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable); - return columnNames; - } + // assert the result set contains the expected values in the given order + private void assertRsValues(ResultSet rs, List<? extends Object> expected) + throws SQLException { + for (int i = 0; i < expected.size(); i++) { + assertEquals(expected.get(i), rs.getObject(i + 1)); + } + } - // inserts one valid data/index row, one data row with a missing index row, - // and one data row with an index row that has a bad covered col val - private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException { - // insert one valid row - upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); - conn.commit(); - - // disable the index and insert another row which is not indexed - disableIndex(); - upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); - upsertRow(dataTableUpsertStmt, 3, "name-3", 95123); - conn.commit(); - - // insert a bad index row for one of the above data rows - upsertIndexRow("name-2", 2, 9999); - conn.commit(); - } + private void generateUniqueTableNames() { + schemaName = generateUniqueName(); + dataTableName = generateUniqueName(); + dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + indexTableName = generateUniqueName(); + indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + } - private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, - String invalidRowsQuery) throws SQLException { - ResultSet rs; - ResultSet metadataRs = - IndexScrutinyTableOutput.queryAllMetadata(conn, dataTableFullName, - indexTableFullName, scrutinyTimeMillis); - assertTrue(metadataRs.next()); - List<? extends Object> expected = - Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, - SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, - 2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", - "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery); - if (dataTableDdl.contains("SALT_BUCKETS")) { - expected = Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, - SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, - 2L, 1L, 2L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", - "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery); - } - - assertRsValues(metadataRs, expected); - String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET"); - rs = conn.createStatement().executeQuery(missingTargetQuery); - assertTrue(rs.next()); - assertEquals(3, rs.getInt("ID")); - assertFalse(rs.next()); - String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL"); - rs = conn.createStatement().executeQuery(badCoveredColQuery); - assertTrue(rs.next()); - assertEquals(2, rs.getInt("ID")); - assertFalse(rs.next()); - } + private int countRows(String tableFullName) throws SQLException { + ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName); + count.next(); + int numRows = count.getInt(1); + return numRows; + } - // assert the result set contains the expected values in the given order - private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException { - for (int i = 0; i < expected.size(); i++) { - assertEquals(expected.get(i), rs.getObject(i + 1)); + private void upsertIndexRow(String name, int id, int zip) throws SQLException { + indexTableUpsertStmt.setString(1, name); + indexTableUpsertStmt.setInt(2, id); // id + indexTableUpsertStmt.setInt(3, zip); // bad zip + indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime)); + indexTableUpsertStmt.executeUpdate(); } - } - private void generateUniqueTableNames() { - schemaName = generateUniqueName(); - dataTableName = generateUniqueName(); - dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - indexTableName = generateUniqueName(); - indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - } + private void disableIndex() throws SQLException { + conn.createStatement().execute( + String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName)); + conn.commit(); + } - private int countRows(String tableFullName) throws SQLException { - ResultSet count = conn.createStatement().executeQuery("select count(*) from " + tableFullName); - count.next(); - int numRows = count.getInt(1); - return numRows; - } + private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, + SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, Long maxOutputRows) { + return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable, + outputInvalidRows, outputFormat, maxOutputRows, null, Long.MAX_VALUE); + } - private void upsertIndexRow(String name, int id, int zip) throws SQLException { - indexTableUpsertStmt.setString(1, name); - indexTableUpsertStmt.setInt(2, id); // id - indexTableUpsertStmt.setInt(3, zip); // bad zip - indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime)); - indexTableUpsertStmt.executeUpdate(); - } + private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception { + return runScrutiny( + getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, + false, null, null, null, scrutinyTS)); + } - private void disableIndex() throws SQLException { - conn.createStatement().execute( - String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName)); - conn.commit(); - } + private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception { + return runScrutiny(schemaName, dataTableName, indexTableName, null, null); + } - private long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) { - return counters.findCounter(counter).getValue(); - } + private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName, + Long batchSize) throws Exception { + return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null); + } - private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, - SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, - Long maxOutputRows) { - return getArgValues(schemaName, dataTable, indxTable, batchSize, sourceTable, - outputInvalidRows, outputFormat, maxOutputRows, Long.MAX_VALUE); - } + private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName, + Long batchSize, SourceTable sourceTable) throws Exception { + final String[] cmdArgs = + getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, + false, null, null, null, Long.MAX_VALUE); + return runScrutiny(cmdArgs); + } - private String[] getArgValues(String schemaName, String dataTable, String indxTable, Long batchSize, - SourceTable sourceTable, boolean outputInvalidRows, OutputFormat outputFormat, - Long maxOutputRows, Long scrutinyTs) { - final List<String> args = Lists.newArrayList(); - if (schemaName != null) { - args.add("-s"); - args.add(schemaName); - } - args.add("-dt"); - args.add(dataTable); - args.add("-it"); - args.add(indxTable); - - // TODO test snapshot reads - // if(useSnapshot) { - // args.add("-snap"); - // } - - if (OutputFormat.FILE.equals(outputFormat)) { - args.add("-op"); - outputDir = "/tmp/" + UUID.randomUUID().toString(); - args.add(outputDir); - } - args.add("-t"); - args.add(String.valueOf(scrutinyTs)); - args.add("-run-foreground"); - if (batchSize != null) { - args.add("-b"); - args.add(String.valueOf(batchSize)); - } - - // default to using data table as the source table - args.add("-src"); - if (sourceTable == null) { - args.add(SourceTable.DATA_TABLE_SOURCE.name()); - } else { - args.add(sourceTable.name()); - } - if (outputInvalidRows) { - args.add("-o"); - } - if (outputFormat != null) { - args.add("-of"); - args.add(outputFormat.name()); - } - if (maxOutputRows != null) { - args.add("-om"); - args.add(maxOutputRows.toString()); - } - return args.toArray(new String[0]); - } + private void upsertRow(PreparedStatement stmt, int id, String name, int zip) + throws SQLException { + int index = 1; + // insert row + stmt.setInt(index++, id); + stmt.setString(index++, name); + stmt.setInt(index++, zip); + stmt.setTimestamp(index++, new Timestamp(testTime)); + stmt.executeUpdate(); + } - private List<Job> runScrutinyCurrentSCN(String schemaName, String dataTableName, String indexTableName, Long scrutinyTS) throws Exception { - return runScrutiny(getArgValues(schemaName, dataTableName, indexTableName, null, SourceTable.BOTH, false, null, null, scrutinyTS)); - } + private int deleteRow(String fullTableName, String whereCondition) throws SQLException { + String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition; + PreparedStatement deleteStmt = conn.prepareStatement(deleteSql); + return deleteStmt.executeUpdate(); + } - private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) throws Exception { - return runScrutiny(schemaName, dataTableName, indexTableName, null, null); } - private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName, - Long batchSize) throws Exception { - return runScrutiny(schemaName, dataTableName, indexTableName, batchSize, null); - } + public static class IndexScrutinyToolTenantIT extends SharedIndexToolIT { + private Connection connGlobal = null; + private Connection connTenant = null; - private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName, - Long batchSize, SourceTable sourceTable) throws Exception { - final String[] cmdArgs = - getArgValues(schemaName, dataTableName, indexTableName, batchSize, sourceTable, false, - null, null, Long.MAX_VALUE); - return runScrutiny(cmdArgs); - } + private String tenantId; + private String tenantViewName; + private String indexNameTenant; + private String multiTenantTable; + private String viewIndexTableName; + + private final String createViewStr = "CREATE VIEW %s AS SELECT * FROM %s"; + private final String + upsertQueryStr = + "UPSERT INTO %s (COL1, ID, NAME) VALUES('%s' , %d, '%s')"; + private final String createIndexStr = "CREATE INDEX %s ON %s (NAME) "; + + /** + * Create the test data + */ + @Before public void setup() throws SQLException { + tenantId = generateUniqueName(); + tenantViewName = generateUniqueName(); + indexNameTenant = generateUniqueName(); + multiTenantTable = generateUniqueName(); + viewIndexTableName = "_IDX_" + multiTenantTable; + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + connGlobal = DriverManager.getConnection(getUrl(), props); + + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + connTenant = DriverManager.getConnection(getUrl(), props); + String + createTblStr = + "CREATE TABLE %s (COL1 VARCHAR(15) NOT NULL,ID INTEGER NOT NULL" + ", NAME VARCHAR, CONSTRAINT PK_1 PRIMARY KEY (COL1, ID)) MULTI_TENANT=true"; + + createTestTable(getUrl(), String.format(createTblStr, multiTenantTable)); + + connTenant.createStatement().execute( + String.format(createViewStr, tenantViewName, multiTenantTable)); - private List<Job> runScrutiny(String[] cmdArgs) throws Exception { - IndexScrutinyTool scrutiny = new IndexScrutinyTool(); - Configuration conf = new Configuration(getUtility().getConfiguration()); - scrutiny.setConf(conf); - int status = scrutiny.run(cmdArgs); - assertEquals(0, status); - for (Job job : scrutiny.getJobs()) { - assertTrue(job.waitForCompletion(true)); + String idxStmtTenant = String.format(createIndexStr, indexNameTenant, tenantViewName); + connTenant.createStatement().execute(idxStmtTenant); } - return scrutiny.getJobs(); - } - private void upsertRow(PreparedStatement stmt, int id, String name, int zip) - throws SQLException { - int index = 1; - // insert row - stmt.setInt(index++, id); - stmt.setString(index++, name); - stmt.setInt(index++, zip); - stmt.setTimestamp(index++, new Timestamp(testTime)); - stmt.executeUpdate(); - } + @After public void teardown() throws SQLException { + if (connGlobal != null) { + connGlobal.close(); + } + if (connTenant != null) { + connTenant.close(); + } + } - private int deleteRow(String fullTableName, String whereCondition) throws SQLException { - String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition; - PreparedStatement deleteStmt = conn.prepareStatement(deleteSql); - return deleteStmt.executeUpdate(); - } + /** + * Tests that the config for max number of output rows is observed + */ + @Test public void testTenantViewAndIndexEqual() throws Exception { + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); + connTenant.commit(); + + String[] argValues = + getArgValues("", tenantViewName, indexNameTenant, 10L, + SourceTable.INDEX_TABLE_SOURCE, false, null, null, tenantId, + EnvironmentEdgeManager.currentTimeMillis()); + + List<Job> completedJobs = runScrutiny(argValues); + // Sunny case, both index and view are equal. 1 row + assertEquals(1, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } + /** + * Tests global view on multi-tenant table should work too + **/ + @Test public void testGlobalViewOnMultiTenantTable() throws Exception { + String globalViewName = generateUniqueName(); + String indexNameGlobal = generateUniqueName(); + + connGlobal.createStatement().execute( + String.format(createViewStr, globalViewName, multiTenantTable)); + + String idxStmtGlobal = String.format(createIndexStr, indexNameGlobal, globalViewName); + connGlobal.createStatement().execute(idxStmtGlobal); + connGlobal.createStatement() + .execute(String.format(upsertQueryStr, globalViewName, "global", 5, "x")); + connGlobal.commit(); + String[] argValues = + getArgValues("", globalViewName, indexNameGlobal, 10L, + SourceTable.INDEX_TABLE_SOURCE, false, null, null, null, + EnvironmentEdgeManager.currentTimeMillis()); + List<Job> completedJobs = runScrutiny(argValues); + // Sunny case, both index and view are equal. 1 row + assertEquals(1, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } + + /** + * Use Both as source. Add 1 row to tenant view and disable index. + * Add 1 more to view and add a wrong row to index. + * Both have 1 invalid row, 1 valid row. + **/ + @Test + public void testOneValidOneInvalidUsingBothAsSource() throws Exception { + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); + connTenant.commit(); + connTenant.createStatement().execute( + String.format("ALTER INDEX %s ON %S disable", indexNameTenant, tenantViewName)); + + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2")); + + connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')", + indexNameTenant, 5555, "wrongName")); + connTenant.commit(); + + String[] + argValues = + getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.BOTH, false, + null, null, tenantId, EnvironmentEdgeManager.currentTimeMillis()); + List<Job> completedJobs = runScrutiny(argValues); + + assertEquals(2, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } + + /** + * Add 3 rows to Tenant view. + * Empty index table and observe they are not equal. + * Use data table as source and output to file. + * Output to table doesn't work for tenantid connection because it can't create the scrutiny table as tenant. + **/ + @Test public void testWithEmptyIndexTableOutputToFile() throws Exception{ + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 1, "x")); + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 2, "x2")); + connTenant.createStatement() + .execute(String.format(upsertQueryStr, tenantViewName, tenantId, 3, "x3")); + connTenant.createStatement().execute(String.format("UPSERT INTO %s (\":ID\", \"0:NAME\") values (%d, '%s')", + indexNameTenant, 5555, "wrongName")); + connTenant.commit(); + + ConnectionQueryServices queryServices = connGlobal.unwrap(PhoenixConnection.class).getQueryServices(); + Admin admin = queryServices.getAdmin(); + TableName tableName = TableName.valueOf(viewIndexTableName); + admin.disableTable(tableName); + admin.truncateTable(tableName, false); + + String[] + argValues = + getArgValues("", tenantViewName, indexNameTenant, 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null, + tenantId, EnvironmentEdgeManager.currentTimeMillis()); + List<Job> completedJobs = runScrutiny(argValues); + + assertEquals(1, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(0, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(3, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java index 81081bf..c424787 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import com.google.common.base.Strings; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Pair; @@ -93,6 +94,7 @@ public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWrit final Properties overrideProps = new Properties(); String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + connection = ConnectionUtil.getOutputConnection(configuration, overrideProps); connection.setAutoCommit(false); batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java index 411214e..ac69bd8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java @@ -26,6 +26,7 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.List; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java index c321b3a..26d7336 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java @@ -24,6 +24,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; +import com.google.common.base.Strings; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -54,7 +55,10 @@ import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; import org.apache.phoenix.parse.HintNode.Hint; +import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; @@ -66,6 +70,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import static org.apache.phoenix.util.MetaDataUtil.VIEW_INDEX_TABLE_PREFIX; + /** * An MR job to verify that the index table is in sync with the data table. * @@ -111,6 +117,8 @@ public class IndexScrutinyTool extends Configured implements Tool { private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true, "Output path where the files are written"); private static final Option OUTPUT_MAX = new Option("om", "output-max", true, "Max number of invalid rows to output per mapper. Defaults to 1M"); + private static final Option TENANT_ID_OPTION = new Option("tenant", "tenant-id", true, + "If specified, uses Tenant connection for tenant view index scrutiny (optional)"); public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]"; /** @@ -145,6 +153,7 @@ public class IndexScrutinyTool extends Configured implements Tool { options.addOption(TIMESTAMP); options.addOption(BATCH_SIZE_OPTION); options.addOption(SOURCE_TABLE_OPTION); + options.addOption(TENANT_ID_OPTION); return options; } @@ -202,10 +211,11 @@ public class IndexScrutinyTool extends Configured implements Tool { private String basePath; private long scrutinyExecuteTime; private long outputMaxRows; // per mapper + private String tenantId; public JobFactory(Connection connection, Configuration configuration, long batchSize, boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat, - String basePath, long outputMaxRows) { + String basePath, long outputMaxRows, String tenantId) { this.outputInvalidRows = outputInvalidRows; this.outputFormat = outputFormat; this.basePath = basePath; @@ -214,12 +224,16 @@ public class IndexScrutinyTool extends Configured implements Tool { this.connection = connection; this.configuration = configuration; this.useSnapshot = useSnapshot; + this.tenantId = tenantId; this.ts = ts; // CURRENT_SCN to set scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); // time at which scrutiny was run. // Same for // all jobs created from this factory PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration, scrutinyExecuteTime); + if (!Strings.isNullOrEmpty(tenantId)) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } } public Job createSubmittableJob(String schemaName, String indexTable, String dataTable, @@ -362,10 +376,17 @@ public class IndexScrutinyTool extends Configured implements Tool { printHelpAndExit(e.getMessage(), getOptions()); } final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); + boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt()); + String tenantId = null; + if (useTenantId) { + tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); + configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + LOG.info(String.format("IndexScrutinyTool uses a tenantId %s", tenantId)); + } connection = ConnectionUtil.getInputConnection(configuration); final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); - final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); + String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); String basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); @@ -388,7 +409,7 @@ public class IndexScrutinyTool extends Configured implements Tool { : EnvironmentEdgeManager.currentTimeMillis() - 60000; if (indexTable != null) { - if (!isValidIndexTable(connection, qDataTable, indexTable)) { + if (!IndexTool.isValidIndexTable(connection, qDataTable, indexTable, tenantId)) { throw new IllegalArgumentException(String .format(" %s is not an index table for %s ", indexTable, qDataTable)); } @@ -420,7 +441,7 @@ public class IndexScrutinyTool extends Configured implements Tool { outputFormat, outputMaxRows)); JobFactory jobFactory = new JobFactory(connection, configuration, batchSize, useSnapshot, ts, - outputInvalidRows, outputFormat, basePath, outputMaxRows); + outputInvalidRows, outputFormat, basePath, outputMaxRows, tenantId); // If we are running the scrutiny with both tables as the source, run two separate jobs, // one for each direction if (SourceTable.BOTH.equals(sourceTable)) { @@ -481,38 +502,6 @@ public class IndexScrutinyTool extends Configured implements Tool { return jobs; } - /** - * Checks for the validity of the index table passed to the job. - * @param connection - * @param masterTable - * @param indexTable - * @return - * @throws SQLException - */ - private boolean isValidIndexTable(final Connection connection, final String masterTable, - final String indexTable) throws SQLException { - final DatabaseMetaData dbMetaData = connection.getMetaData(); - final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable); - final String tableName = - SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable)); - - ResultSet rs = null; - try { - rs = dbMetaData.getIndexInfo("", schemaName, tableName, false, false); - while (rs.next()) { - final String indexName = rs.getString(6); - if (indexTable.equalsIgnoreCase(indexName)) { - return true; - } - } - } finally { - if (rs != null) { - rs.close(); - } - } - return false; - } - public static void main(final String[] args) throws Exception { int result = ToolRunner.run(new IndexScrutinyTool(), args); System.exit(result); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index f5e4001..ee2ae0b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -33,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.base.Strings; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -339,7 +340,7 @@ public class IndexTool extends Configured implements Tool { ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); IndexMaintainer.serializeAdditional(pDataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class)); PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr); - if (tenantId != null) { + if (!Strings.isNullOrEmpty(tenantId)) { PhoenixConfigurationUtil.setTenantId(configuration, tenantId); } @@ -802,7 +803,7 @@ public class IndexTool extends Configured implements Tool { * @return * @throws SQLException */ - private boolean isValidIndexTable(final Connection connection, final String masterTable, + public static boolean isValidIndexTable(final Connection connection, final String masterTable, final String indexTable, final String tenantId) throws SQLException { final DatabaseMetaData dbMetaData = connection.getMetaData(); final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java index 6f2959f..df8c7ab 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java @@ -61,6 +61,9 @@ public class IndexColumnNames { if (pindexTable.getViewIndexId() != null) { offset++; } + if (pindexTable.isMultiTenant()) { + offset++; + } if (offset > 0) { pindexCols = pindexCols.subList(offset, pindexCols.size()); pkColumns = pkColumns.subList(offset, pkColumns.size());