PHOENIX-2460 Implement scrutiny command to validate whether or not an index is in sync with the data table (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/08e2a29f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/08e2a29f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/08e2a29f Branch: refs/heads/4.x-HBase-0.98 Commit: 08e2a29f60bcedd107f96fc50cfeb8996296d56f Parents: ce6810f Author: James Taylor <jtay...@salesforce.com> Authored: Mon Aug 28 15:32:20 2017 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Mon Aug 28 15:40:56 2017 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/IndexScrutinyToolIT.java | 612 +++++++++++++++++++ .../mapreduce/index/IndexScrutinyMapper.java | 377 ++++++++++++ .../index/IndexScrutinyTableOutput.java | 345 +++++++++++ .../mapreduce/index/IndexScrutinyTool.java | 522 ++++++++++++++++ .../mapreduce/index/PhoenixIndexDBWritable.java | 11 +- .../index/PhoenixScrutinyJobCounters.java | 41 ++ .../index/SourceTargetColumnNames.java | 195 ++++++ .../mapreduce/util/IndexColumnNames.java | 239 ++++++++ .../util/PhoenixConfigurationUtil.java | 128 +++- .../java/org/apache/phoenix/util/QueryUtil.java | 63 +- .../org/apache/phoenix/util/SchemaUtil.java | 21 + .../phoenix/mapreduce/index/BaseIndexTest.java | 97 +++ .../index/IndexScrutinyTableOutputTest.java | 87 +++ .../mapreduce/util/IndexColumnNamesTest.java | 74 +++ .../org/apache/phoenix/util/QueryUtilTest.java | 18 +- 15 files changed, 2817 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java new file mode 100644 index 0000000..5068610 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT; +import static org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.mapreduce.CsvBulkImportUtil; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; +import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters; +import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Tests for the {@link IndexScrutinyTool} + */ +public class IndexScrutinyToolIT extends BaseTest { + + private static final String DATA_TABLE_DDL = + "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)"; + + private static final String INDEX_TABLE_DDL = "CREATE INDEX %s ON %s (NAME, EMPLOY_DATE) INCLUDE (ZIP)"; + + private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)"; + + private static final String INDEX_UPSERT_SQL = "UPSERT INTO %s (\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)"; + + private static final String DELETE_SQL = "DELETE FROM %s "; + + private String schemaName; + private String dataTableName; + private String dataTableFullName; + private String indexTableName; + private String indexTableFullName; + private String outputDir; + + private Connection conn; + + private PreparedStatement dataTableUpsertStmt; + + private PreparedStatement indexTableUpsertStmt; + + private long testTime; + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMap(); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + /** + * Create the test data and index tables + */ + @Before + public void setup() throws SQLException { + generateUniqueTableNames(); + createTestTable(getUrl(), String.format(DATA_TABLE_DDL, dataTableFullName)); + createTestTable(getUrl(), + String.format(INDEX_TABLE_DDL, indexTableName, dataTableFullName)); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName); + dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert); + String indexTableUpsert = String.format(INDEX_UPSERT_SQL, indexTableFullName); + indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert); + conn.setAutoCommit(false); + testTime = System.currentTimeMillis(); + } + + @After + public void teardown() throws SQLException { + if (conn != null) { + conn.close(); + } + } + + /** + * Tests a data table that is correctly indexed. Scrutiny should report all rows as valid. + */ + @Test + public void testValidIndex() throws Exception { + // insert two rows + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + int numDataRows = countRows(dataTableFullName); + int numIndexRows = countRows(indexTableFullName); + + // scrutiny should report everything as ok + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT)); + + // make sure row counts weren't modified by scrutiny + assertEquals(numDataRows, countRows(dataTableFullName)); + assertEquals(numIndexRows, countRows(indexTableFullName)); + } + + /** + * Tests an index with the same # of rows as the data table, but one of the index rows is + * incorrect Scrutiny should report the invalid rows. + */ + @Test + public void testEqualRowCountIndexIncorrect() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad row into the index + upsertIndexRow("badName", 2, 9999); + conn.commit(); + + // scrutiny should report the bad row + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + } + + /** + * Tests an index where the index pk is correct (indexed col values are indexed correctly), but + * a covered index value is incorrect. Scrutiny should report the invalid row + */ + @Test + public void testCoveredValueIncorrect() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable index and insert another data row + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad index row for the above data row + upsertIndexRow("name-2", 2, 9999); + conn.commit(); + + // scrutiny should report the bad row + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT)); + } + + /** + * Test batching of row comparisons Inserts 1001 rows, with some random bad rows, and runs + * scrutiny with batchsize of 10, + */ + @Test + public void testBatching() throws Exception { + // insert 1001 data and index rows + int numTestRows = 1001; + for (int i = 0; i < numTestRows; i++) { + upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000); + } + conn.commit(); + + disableIndex(); + + // randomly delete some rows from the index + Random random = new Random(); + for (int i = 0; i < 100; i++) { + int idToDelete = random.nextInt(numTestRows); + deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete); + } + conn.commit(); + int numRows = countRows(indexTableFullName); + int numDeleted = numTestRows - numRows; + + // run scrutiny with batch size of 10 + List<Job> completedJobs = + runScrutiny(schemaName, dataTableName, indexTableName, System.currentTimeMillis(), + 10L); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(numTestRows - numDeleted, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT)); + assertEquals(numTestRows / 10 + numTestRows % 10, + getCounterValue(counters, BATCHES_PROCESSED_COUNT)); + } + + /** + * Tests when there are more data table rows than index table rows Scrutiny should report the + * number of incorrect rows + */ + @Test + public void testMoreDataRows() throws Exception { + upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); + conn.commit(); + disableIndex(); + // these rows won't have a corresponding index row + upsertRow(dataTableUpsertStmt, 2, "name-2", 95124); + upsertRow(dataTableUpsertStmt, 3, "name-3", 95125); + conn.commit(); + + List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); + } + + /** + * Tests when there are more index table rows than data table rows Scrutiny should report the + * number of incorrect rows when run with the index as the source table + */ + @Test + public void testMoreIndexRows() throws Exception { + upsertRow(dataTableUpsertStmt, 1, "name-1", 95123); + conn.commit(); + disableIndex(); + // these index rows won't have a corresponding data row + upsertIndexRow("name-2", 2, 95124); + upsertIndexRow("name-3", 3, 95125); + conn.commit(); + + List<Job> completedJobs = + runScrutiny(schemaName, dataTableName, indexTableName, System.currentTimeMillis(), + 10L, SourceTable.INDEX_TABLE_SOURCE); + Job job = completedJobs.get(0); + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); + } + + /** + * Tests running with both the index and data tables as the source table If we have an + * incorrectly indexed row, it should be reported in each direction + */ + @Test + public void testBothDataAndIndexAsSource() throws Exception { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + conn.commit(); + + // insert a bad row into the index + upsertIndexRow("badName", 2, 9999); + conn.commit(); + + List<Job> completedJobs = + runScrutiny(schemaName, dataTableName, indexTableName, System.currentTimeMillis(), + 10L, SourceTable.BOTH); + assertEquals(2, completedJobs.size()); + for (Job job : completedJobs) { + assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); + assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); + assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); + } + } + + /** + * Tests that with the output to file option set, the scrutiny tool outputs invalid rows to file + */ + @Test + public void testOutputInvalidRowsToFile() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + + String[] argValues = + getArgValues(schemaName, dataTableName, indexTableName, System.currentTimeMillis(), + 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.FILE, null); + runScrutiny(argValues); + + // check the output files + Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), dataTableFullName); + DistributedFileSystem fs = getUtility().getDFSCluster().getFileSystem(); + Path outputFilePath = new Path(outputPath, "part-m-00000"); + assertTrue(fs.exists(outputFilePath)); + FSDataInputStream fsDataInputStream = fs.open(outputFilePath); + BufferedReader reader = new BufferedReader(new InputStreamReader(fsDataInputStream)); + try { + assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + ", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", reader.readLine()); + assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + ", 95123]\tTarget row not found", reader.readLine()); + } finally { + reader.close(); + fsDataInputStream.close(); + } + } + + /** + * Tests writing of results to the output table + */ + @Test + public void testOutputInvalidRowsToTable() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + String[] argValues = + getArgValues(schemaName, dataTableName, indexTableName, System.currentTimeMillis(), + 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, null); + List<Job> completedJobs = runScrutiny(argValues); + + // check that the output table contains the invalid rows + long scrutinyTimeMillis = + PhoenixConfigurationUtil + .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); + String invalidRowsQuery = + IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(), + scrutinyTimeMillis); + ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + " ORDER BY ID asc"); + assertTrue(rs.next()); + assertEquals(dataTableFullName, + rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); + assertEquals(indexTableFullName, + rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); + assertTrue(rs.getBoolean("HAS_TARGET_ROW")); + assertEquals(2, rs.getInt("ID")); + assertEquals(2, rs.getInt(":ID")); + assertEquals(95123, rs.getInt("ZIP")); + assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect + assertTrue(rs.next()); + assertEquals(dataTableFullName, + rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME)); + assertEquals(indexTableFullName, + rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME)); + assertFalse(rs.getBoolean("HAS_TARGET_ROW")); + assertEquals(3, rs.getInt("ID")); + assertEquals(null, rs.getObject(":ID")); // null for missing target row + assertFalse(rs.next()); + + // check that the job results were written correctly to the metadata table + assertMetadataTableValues(argValues, scrutinyTimeMillis, invalidRowsQuery); + } + + /** + * Tests that the config for max number of output rows is observed + */ + @Test + public void testMaxOutputRows() throws Exception { + insertOneValid_OneBadVal_OneMissingTarget(); + // set max to 1. There are two bad rows, but only 1 should get written to output table + String[] argValues = + getArgValues(schemaName, dataTableName, indexTableName, System.currentTimeMillis(), + 10L, SourceTable.DATA_TABLE_SOURCE, true, OutputFormat.TABLE, new Long(1)); + List<Job> completedJobs = runScrutiny(argValues); + long scrutinyTimeMillis = + PhoenixConfigurationUtil + .getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration()); + String invalidRowsQuery = + IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, getColNames(), + scrutinyTimeMillis); + ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery); + assertTrue(rs.next()); + assertFalse(rs.next()); + } + + private SourceTargetColumnNames getColNames() throws SQLException { + PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName); + PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName); + SourceTargetColumnNames columnNames = + new SourceTargetColumnNames.DataSourceColNames(pdataTable, pindexTable); + return columnNames; + } + + // inserts one valid data/index row, one data row with a missing index row, + // and one data row with an index row that has a bad covered col val + private void insertOneValid_OneBadVal_OneMissingTarget() throws SQLException { + // insert one valid row + upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); + conn.commit(); + + // disable the index and insert another row which is not indexed + disableIndex(); + upsertRow(dataTableUpsertStmt, 2, "name-2", 95123); + upsertRow(dataTableUpsertStmt, 3, "name-3", 95123); + conn.commit(); + + // insert a bad index row for one of the above data rows + upsertIndexRow("name-2", 2, 9999); + conn.commit(); + } + + private void assertMetadataTableValues(String[] argValues, long scrutinyTimeMillis, + String invalidRowsQuery) throws SQLException { + ResultSet rs; + ResultSet metadataRs = + IndexScrutinyTableOutput.queryAllMetadata(conn, dataTableFullName, + indexTableFullName, scrutinyTimeMillis); + assertTrue(metadataRs.next()); + List<? extends Object> expected = + Arrays.asList(dataTableFullName, indexTableFullName, scrutinyTimeMillis, + SourceTable.DATA_TABLE_SOURCE.name(), Arrays.toString(argValues), 3L, 0L, 1L, + 2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, \"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]", + "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery); + assertRsValues(metadataRs, expected); + String missingTargetQuery = metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET"); + rs = conn.createStatement().executeQuery(missingTargetQuery); + assertTrue(rs.next()); + assertEquals(3, rs.getInt("ID")); + assertFalse(rs.next()); + String badCoveredColQuery = metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL"); + rs = conn.createStatement().executeQuery(badCoveredColQuery); + assertTrue(rs.next()); + assertEquals(2, rs.getInt("ID")); + assertFalse(rs.next()); + } + + // assert the result set contains the expected values in the given order + private void assertRsValues(ResultSet rs, List<? extends Object> expected) throws SQLException { + for (int i = 0; i < expected.size(); i++) { + assertEquals(expected.get(i), rs.getObject(i + 1)); + } + } + + private void generateUniqueTableNames() { + schemaName = generateUniqueName(); + dataTableName = generateUniqueName(); + dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + indexTableName = generateUniqueName(); + indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + } + + private int countRows(String tableFullName) throws SQLException { + ResultSet count = + conn.createStatement().executeQuery("select count(*) from " + tableFullName); + count.next(); + int numRows = count.getInt(1); + return numRows; + } + + private void upsertIndexRow(String name, int id, int zip) throws SQLException { + indexTableUpsertStmt.setString(1, name); + indexTableUpsertStmt.setInt(2, id); // id + indexTableUpsertStmt.setInt(3, zip); // bad zip + indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime)); + indexTableUpsertStmt.executeUpdate(); + } + + private void disableIndex() throws SQLException { + conn.createStatement().execute( + String.format("ALTER INDEX %s ON %S disable", indexTableName, dataTableFullName)); + conn.commit(); + } + + private long getCounterValue(Counters counters, Enum<PhoenixScrutinyJobCounters> counter) { + return counters.findCounter(counter).getValue(); + } + + private String[] getArgValues(String schemaName, String dataTable, String indxTable, long ts, + Long batchSize, SourceTable sourceTable, boolean outputInvalidRows, + OutputFormat outputFormat, Long maxOutputRows) { + final List<String> args = Lists.newArrayList(); + if (schemaName != null) { + args.add("-s"); + args.add(schemaName); + } + args.add("-dt"); + args.add(dataTable); + args.add("-it"); + args.add(indxTable); + + // TODO test snapshot reads + // if(useSnapshot) { + // args.add("-snap"); + // } + + if (OutputFormat.FILE.equals(outputFormat)) { + args.add("-op"); + outputDir = "/tmp/" + UUID.randomUUID().toString(); + args.add(outputDir); + } + args.add("-t"); + args.add(String.valueOf(ts)); + args.add("-run-foreground"); + if (batchSize != null) { + args.add("-b"); + args.add(String.valueOf(batchSize)); + } + + // default to using data table as the source table + args.add("-src"); + if (sourceTable == null) { + args.add(SourceTable.DATA_TABLE_SOURCE.name()); + } else { + args.add(sourceTable.name()); + } + if (outputInvalidRows) { + args.add("-o"); + } + if (outputFormat != null) { + args.add("-of"); + args.add(outputFormat.name()); + } + if (maxOutputRows != null) { + args.add("-om"); + args.add(maxOutputRows.toString()); + } + return args.toArray(new String[0]); + } + + private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName) + throws Exception { + return runScrutiny(schemaName, dataTableName, indexTableName, System.currentTimeMillis()); + } + + private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName, + long ts) throws Exception { + return runScrutiny(schemaName, dataTableName, indexTableName, ts, null, null); + } + + private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName, + long ts, Long batchSize) throws Exception { + return runScrutiny(schemaName, dataTableName, indexTableName, ts, batchSize, null); + } + + private List<Job> runScrutiny(String schemaName, String dataTableName, String indexTableName, + long ts, Long batchSize, SourceTable sourceTable) throws Exception { + final String[] cmdArgs = + getArgValues(schemaName, dataTableName, indexTableName, ts, batchSize, sourceTable, + false, null, null); + return runScrutiny(cmdArgs); + } + + private List<Job> runScrutiny(String[] cmdArgs) throws Exception { + IndexScrutinyTool scrutiny = new IndexScrutinyTool(); + Configuration conf = new Configuration(getUtility().getConfiguration()); + scrutiny.setConf(conf); + int status = scrutiny.run(cmdArgs); + assertEquals(0, status); + return scrutiny.getJobs(); + } + + private void upsertRow(PreparedStatement stmt, int id, String name, int zip) + throws SQLException { + int index = 1; + // insert row + stmt.setInt(index++, id); + stmt.setString(index++, name); + stmt.setInt(index++, zip); + stmt.setTimestamp(index++, new Timestamp(testTime)); + stmt.executeUpdate(); + } + + private int deleteRow(String fullTableName, String whereCondition) throws SQLException { + String deleteSql = String.format(DELETE_SQL, indexTableFullName) + whereCondition; + PreparedStatement deleteStmt = conn.prepareStatement(deleteSql); + return deleteStmt.executeUpdate(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java new file mode 100644 index 0000000..81081bf --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.mapreduce.PhoenixJobCounters; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.parse.HintNode.Hint; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + +/** + * Mapper that reads from the data table and checks the rows against the index table + */ +public class IndexScrutinyMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, Text, Text> { + + private static final Logger LOG = LoggerFactory.getLogger(IndexScrutinyMapper.class); + private Connection connection; + private List<ColumnInfo> targetTblColumnMetadata; + private long batchSize; + // holds a batch of rows from the table the mapper is iterating over + // Each row is a pair - the row TS, and the row values + private List<Pair<Long, List<Object>>> currentBatchValues = new ArrayList<>(); + private String targetTableQuery; + private int numTargetPkCols; + private boolean outputInvalidRows; + private OutputFormat outputFormat = OutputFormat.FILE; + private String qSourceTable; + private String qTargetTable; + private long executeTimestamp; + private int numSourcePkCols; + private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable(); + private List<ColumnInfo> sourceTblColumnMetadata; + + // used to write results to the output table + private Connection outputConn; + private PreparedStatement outputUpsertStmt; + private long outputMaxRows; + private MessageDigest md5; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + final Configuration configuration = context.getConfiguration(); + try { + // get a connection with correct CURRENT_SCN (so incoming writes don't throw off the + // scrutiny) + final Properties overrideProps = new Properties(); + String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + connection = ConnectionUtil.getOutputConnection(configuration, overrideProps); + connection.setAutoCommit(false); + batchSize = PhoenixConfigurationUtil.getScrutinyBatchSize(configuration); + outputInvalidRows = + PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration); + outputFormat = PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration); + executeTimestamp = PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration); + + // get the index table and column names + String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(configuration); + final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); + final String qIndexTable = + PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration); + final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable); + + // set the target table based on whether we're running the MR over the data or index + // table + SourceTable sourceTable = + PhoenixConfigurationUtil.getScrutinySourceTable(configuration); + SourceTargetColumnNames columnNames = + SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) + ? new SourceTargetColumnNames.DataSourceColNames(pdataTable, + pindexTable) + : new SourceTargetColumnNames.IndexSourceColNames(pdataTable, + pindexTable); + qSourceTable = columnNames.getQualifiedSourceTableName(); + qTargetTable = columnNames.getQualifiedTargetTableName(); + List<String> targetColNames = columnNames.getTargetColNames(); + List<String> sourceColNames = columnNames.getSourceColNames(); + List<String> targetPkColNames = columnNames.getTargetPkColNames(); + String targetPksCsv = + Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(targetPkColNames)); + numSourcePkCols = columnNames.getSourcePkColNames().size(); + numTargetPkCols = targetPkColNames.size(); + + if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { + outputConn = ConnectionUtil.getOutputConnection(configuration, new Properties()); + String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); + this.outputUpsertStmt = outputConn.prepareStatement(upsertQuery); + } + outputMaxRows = PhoenixConfigurationUtil.getScrutinyOutputMax(configuration); + + // Create the query against the target table + // Our query projection should be all the index column names (or their data table + // equivalent + // name) + targetTableQuery = + QueryUtil.constructSelectStatement(qTargetTable, columnNames.getCastedTargetColNames(), targetPksCsv, + Hint.NO_INDEX, false) + " IN "; + targetTblColumnMetadata = + PhoenixRuntime.generateColumnInfo(connection, qTargetTable, targetColNames); + sourceTblColumnMetadata = + PhoenixRuntime.generateColumnInfo(connection, qSourceTable, sourceColNames); + LOG.info("Target table base query: " + targetTableQuery); + md5 = MessageDigest.getInstance("MD5"); + } catch (SQLException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context) + throws IOException, InterruptedException { + try { + final List<Object> values = record.getValues(); + + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + currentBatchValues.add(new Pair<>(record.getRowTs(), values)); + if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize != 0) { + // if we haven't hit the batch size, just report progress and move on to next record + context.progress(); + return; + } else { + // otherwise, process the batch + processBatch(context); + } + context.progress(); // Make sure progress is reported to Application Master. + } catch (SQLException | IllegalArgumentException e) { + LOG.error(" Error while read/write of a record ", e); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + throw new IOException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + if (connection != null) { + try { + processBatch(context); + connection.close(); + if (outputConn != null) { + outputConn.close(); + } + } catch (SQLException e) { + LOG.error("Error while closing connection in the PhoenixIndexMapper class ", e); + throw new IOException(e); + } + } + } + + private void processBatch(Context context) + throws SQLException, IOException, InterruptedException { + if (currentBatchValues.size() == 0) return; + context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1); + // our query selection filter should be the PK columns of the target table (index or data + // table) + String inClause = + QueryUtil.constructParameterizedInClause(numTargetPkCols, + currentBatchValues.size()); + String indexQuery = targetTableQuery + inClause; + try (PreparedStatement targetStatement = connection.prepareStatement(indexQuery)) { + // while we build the PreparedStatement, we also maintain a hash of the target table + // PKs, + // which we use to join against the results of the query on the target table + Map<String, Pair<Long, List<Object>>> targetPkToSourceValues = + buildTargetStatement(targetStatement); + + // fetch results from the target table and output invalid rows + queryTargetTable(context, targetStatement, targetPkToSourceValues); + + // any source values we have left over are invalid (e.g. data table rows without + // corresponding index row) + context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT) + .increment(targetPkToSourceValues.size()); + if (outputInvalidRows) { + for (Pair<Long, List<Object>> sourceRowWithoutTargetRow : targetPkToSourceValues.values()) { + List<Object> valuesWithoutTarget = sourceRowWithoutTargetRow.getSecond(); + if (OutputFormat.FILE.equals(outputFormat)) { + context.write( + new Text(Arrays.toString(valuesWithoutTarget.toArray())), + new Text("Target row not found")); + } else if (OutputFormat.TABLE.equals(outputFormat)) { + writeToOutputTable(context, valuesWithoutTarget, null, sourceRowWithoutTargetRow.getFirst(), -1L); + } + } + } + if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { + outputUpsertStmt.executeBatch(); // write out invalid rows to output table + outputConn.commit(); + } + currentBatchValues.clear(); + } + } + + private Map<String, Pair<Long, List<Object>>> buildTargetStatement(PreparedStatement targetStatement) + throws SQLException { + Map<String, Pair<Long, List<Object>>> targetPkToSourceValues = + new HashMap<>(currentBatchValues.size()); + int rsIndex = 1; + for (Pair<Long, List<Object>> batchTsRow : currentBatchValues) { + List<Object> batchRow = batchTsRow.getSecond(); + // our original query against the source table (which provided the batchRow) projected + // with the data table PK cols first, so the first numTargetPkCols form the PK + String targetPkHash = getPkHash(batchRow.subList(0, numTargetPkCols)); + targetPkToSourceValues.put(targetPkHash, batchTsRow); + for (int i = 0; i < numTargetPkCols; i++) { + ColumnInfo targetPkInfo = targetTblColumnMetadata.get(i); + Object value = batchRow.get(i); + if (value == null) { + targetStatement.setNull(rsIndex++, targetPkInfo.getSqlType()); + } else { + targetStatement.setObject(rsIndex++, value, targetPkInfo.getSqlType()); + } + } + } + return targetPkToSourceValues; + } + + private void queryTargetTable(Context context, PreparedStatement targetStatement, + Map<String, Pair<Long, List<Object>>> targetPkToSourceValues) + throws SQLException, IOException, InterruptedException { + ResultSet targetResultSet = targetStatement.executeQuery(); + + while (targetResultSet.next()) { + indxWritable.readFields(targetResultSet); + List<Object> targetValues = indxWritable.getValues(); + // first grab the PK and try to join against the source input + // the query is such that first numTargetPkCols of the resultSet is the PK + List<Object> pkObjects = new ArrayList<>(numTargetPkCols); + for (int i = 0; i < numTargetPkCols; i++) { + Object pkPart = targetResultSet.getObject(i + 1); + pkObjects.add(pkPart); + } + Long targetTS = targetResultSet.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp(); + String targetPk = getPkHash(pkObjects); + + // use the pk to fetch the source table column values + Pair<Long, List<Object>> sourceTsValues = targetPkToSourceValues.get(targetPk); + + Long sourceTS = sourceTsValues.getFirst(); + List<Object> sourceValues = sourceTsValues.getSecond(); + // compare values starting after the PK (i.e. covered columns) + boolean isIndexedCorrectly = + compareValues(numTargetPkCols, targetValues, sourceValues, context); + if (isIndexedCorrectly) { + context.getCounter(PhoenixScrutinyJobCounters.VALID_ROW_COUNT).increment(1); + } else { + context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).increment(1); + if (outputInvalidRows) { + outputInvalidRow(context, sourceValues, targetValues, sourceTS, targetTS); + } + } + targetPkToSourceValues.remove(targetPk); + } + } + + private void outputInvalidRow(Context context, List<Object> sourceValues, + List<Object> targetValues, long sourceTS, long targetTS) throws SQLException, IOException, InterruptedException { + if (OutputFormat.FILE.equals(outputFormat)) { + context.write(new Text(Arrays.toString(sourceValues.toArray())), + new Text(Arrays.toString(targetValues.toArray()))); + } else if (OutputFormat.TABLE.equals(outputFormat)) { + writeToOutputTable(context, sourceValues, targetValues, sourceTS, targetTS); + } + } + + // pass in null targetValues if the target row wasn't found + private void writeToOutputTable(Context context, List<Object> sourceValues, List<Object> targetValues, long sourceTS, long targetTS) + throws SQLException { + if (context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).getValue() > outputMaxRows) { + return; + } + int index = 1; + outputUpsertStmt.setString(index++, qSourceTable); // SOURCE_TABLE + outputUpsertStmt.setString(index++, qTargetTable); // TARGET_TABLE + outputUpsertStmt.setLong(index++, executeTimestamp); // SCRUTINY_EXECUTE_TIME + outputUpsertStmt.setString(index++, getPkHash(sourceValues.subList(0, numSourcePkCols))); // SOURCE_ROW_PK_HASH + outputUpsertStmt.setLong(index++, sourceTS); // SOURCE_TS + outputUpsertStmt.setLong(index++, targetTS); // TARGET_TS + outputUpsertStmt.setBoolean(index++, targetValues != null); // HAS_TARGET_ROW + index = setStatementObjects(sourceValues, index, sourceTblColumnMetadata); + if (targetValues != null) { + index = setStatementObjects(targetValues, index, targetTblColumnMetadata); + } else { // for case where target row wasn't found, put nulls in prepared statement + for (int i = 0; i < sourceValues.size(); i++) { + outputUpsertStmt.setNull(index++, targetTblColumnMetadata.get(i).getSqlType()); + } + } + outputUpsertStmt.addBatch(); + } + + private int setStatementObjects(List<Object> values, int index, List<ColumnInfo> colMetadata) + throws SQLException { + for (int i = 0; i < values.size(); i++) { + Object value = values.get(i); + ColumnInfo colInfo = colMetadata.get(i); + if (value != null) { + outputUpsertStmt.setObject(index++, value, colInfo.getSqlType()); + } else { + outputUpsertStmt.setNull(index++, colInfo.getSqlType()); + } + } + return index; + } + + private boolean compareValues(int startIndex, List<Object> targetValues, + List<Object> sourceValues, Context context) throws SQLException { + if (targetValues == null || sourceValues == null) return false; + for (int i = startIndex; i < sourceValues.size(); i++) { + Object targetValue = targetValues.get(i); + Object sourceValue = sourceValues.get(i); + if (targetValue != null && !targetValue.equals(sourceValue)) { + context.getCounter(PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT) + .increment(1); + return false; + } + } + return true; + } + + private String getPkHash(List<Object> pkObjects) { + try { + for (int i = 0; i < pkObjects.size(); i++) { + md5.update(sourceTblColumnMetadata.get(i).getPDataType().toBytes(pkObjects.get(i))); + } + return Hex.encodeHexString(md5.digest()); + } finally { + md5.reset(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java new file mode 100644 index 0000000..411214e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java @@ -0,0 +1,345 @@ +/** +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.mapreduce.PhoenixJobCounters; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; +import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames; +import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.IndexSourceColNames; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** + * + * IndexScrutiny MapReduce output table DDL and methods to get queries against the output tables + * + */ +public class IndexScrutinyTableOutput { + + /** + * This table holds the invalid rows in the source table (either missing a target, or a bad + * covered column value). Dynamic columns hold the original source and target table column data. + */ + public static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_SCRUTINY"; + public static final String SCRUTINY_EXECUTE_TIME_COL_NAME = "SCRUTINY_EXECUTE_TIME"; + public static final String TARGET_TABLE_COL_NAME = "TARGET_TABLE"; + public static final String SOURCE_TABLE_COL_NAME = "SOURCE_TABLE"; + public static final String OUTPUT_TABLE_DDL = + "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE_NAME + "\n" + + "(\n" + + " " + SOURCE_TABLE_COL_NAME + " VARCHAR NOT NULL,\n" + + " " + TARGET_TABLE_COL_NAME + " VARCHAR NOT NULL,\n" + + " " + SCRUTINY_EXECUTE_TIME_COL_NAME + " BIGINT NOT NULL,\n" + + " SOURCE_ROW_PK_HASH VARCHAR NOT NULL,\n" + + " SOURCE_TS BIGINT,\n" + + " TARGET_TS BIGINT,\n" + + " HAS_TARGET_ROW BOOLEAN,\n" + + " CONSTRAINT PK PRIMARY KEY\n" + + " (\n" + + " " + SOURCE_TABLE_COL_NAME + ",\n" + + " " + TARGET_TABLE_COL_NAME + ",\n" + + " " + SCRUTINY_EXECUTE_TIME_COL_NAME + ",\n" + // time at which the scrutiny ran + " SOURCE_ROW_PK_HASH\n" + // this hash makes the PK unique + " )\n" + // dynamic columns consisting of the source and target columns will follow + ")"; + + /** + * This table holds metadata about a scrutiny job - result counters and queries to fetch invalid + * row data from the output table. The queries contain the dynamic columns which are equivalent + * to the original source/target table columns + */ + public static final String OUTPUT_METADATA_TABLE_NAME = "PHOENIX_INDEX_SCRUTINY_METADATA"; + public static final String OUTPUT_METADATA_DDL = + "CREATE TABLE IF NOT EXISTS " + OUTPUT_METADATA_TABLE_NAME + "\n" + + "(\n" + + " " + SOURCE_TABLE_COL_NAME + " VARCHAR NOT NULL,\n" + + " " + TARGET_TABLE_COL_NAME + " VARCHAR NOT NULL,\n" + + " " + SCRUTINY_EXECUTE_TIME_COL_NAME + " BIGINT NOT NULL,\n" + + " SOURCE_TYPE VARCHAR,\n" + // source is either data or index table + " CMD_LINE_ARGS VARCHAR,\n" + // arguments the tool was run with + " INPUT_RECORDS BIGINT,\n" + + " FAILED_RECORDS BIGINT,\n" + + " VALID_ROW_COUNT BIGINT,\n" + + " INVALID_ROW_COUNT BIGINT,\n" + + " INCORRECT_COVERED_COL_VAL_COUNT BIGINT,\n" + + " BATCHES_PROCESSED_COUNT BIGINT,\n" + + " SOURCE_DYNAMIC_COLS VARCHAR,\n" + + " TARGET_DYNAMIC_COLS VARCHAR,\n" + + " INVALID_ROWS_QUERY_ALL VARCHAR,\n" + // stored sql query to fetch all the invalid rows from the output table + " INVALID_ROWS_QUERY_MISSING_TARGET VARCHAR,\n" + // stored sql query to fetch all the invalid rows which are missing a target row + " INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL VARCHAR,\n" + // stored sql query to fetch all the invalid rows which have bad covered column values + " CONSTRAINT PK PRIMARY KEY\n" + + " (\n" + + " " + SOURCE_TABLE_COL_NAME + ",\n" + + " " + TARGET_TABLE_COL_NAME + ",\n" + + " " + SCRUTINY_EXECUTE_TIME_COL_NAME + "\n" + + " )\n" + + ")\n"; + + public static final String UPSERT_METADATA_SQL = "UPSERT INTO " + OUTPUT_METADATA_TABLE_NAME + " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + + /** + * Gets the parameterized upsert sql to the output table Used by the scrutiny MR job to write + * its results + * @param sourceDynamicCols list of source columns with their types + * @param targetDynamicCols list of target columns with their types + * @param connection connection to use + * @throws SQLException + */ + public static String constructOutputTableUpsert(List<String> sourceDynamicCols, + List<String> targetDynamicCols, Connection connection) throws SQLException { + List<String> outputTableColumns = getOutputTableColumns(connection); + + // construct a dynamic column upsert into the output table + List<String> upsertCols = + Lists.newArrayList( + Iterables.concat(outputTableColumns, sourceDynamicCols, targetDynamicCols)); + String upsertStmt = + QueryUtil.constructUpsertStatement(IndexScrutinyTableOutput.OUTPUT_TABLE_NAME, + upsertCols, null); + return upsertStmt; + } + + /** + * Get the sql to store as INVALID_ROWS_QUERY_ALL in the output metadata table + * @param conn + * @param columnNames + * @param scrutinyTimeMillis + * @return + * @throws SQLException + */ + public static String getSqlQueryAllInvalidRows(Connection conn, + SourceTargetColumnNames columnNames, long scrutinyTimeMillis) throws SQLException { + String paramQuery = getAllInvalidParamQuery(conn, columnNames); + paramQuery = bindPkCols(columnNames, scrutinyTimeMillis, paramQuery); + return paramQuery; + } + + /** + * Get the sql to store as INVALID_ROWS_QUERY_MISSING_TARGET in the output metadata table + * @param conn + * @param columnNames + * @param scrutinyTimeMillis + * @return + * @throws SQLException + */ + public static String getSqlQueryMissingTargetRows(Connection conn, + SourceTargetColumnNames columnNames, long scrutinyTimeMillis) throws SQLException { + String paramQuery = getHasTargetRowQuery(conn, columnNames, scrutinyTimeMillis); + return paramQuery.replaceFirst("\\?", "false"); + } + + /** + * Get the sql to store as INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL in the output metadata table + * @param conn + * @param columnNames + * @param scrutinyTimeMillis + * @return + * @throws SQLException + */ + public static String getSqlQueryBadCoveredColVal(Connection conn, + SourceTargetColumnNames columnNames, long scrutinyTimeMillis) throws SQLException { + String paramQuery = getHasTargetRowQuery(conn, columnNames, scrutinyTimeMillis); + return paramQuery.replaceFirst("\\?", "true"); + } + + /** + * Query the metadata table for the given columns + * @param conn connection to use + * @param selectCols columns to select from the metadata table + * @param qSourceTableName source table full name + * @param qTargetTableName target table full name + * @param scrutinyTimeMillis time when scrutiny was run + * @return + * @throws SQLException + */ + public static ResultSet queryMetadata(Connection conn, List<String> selectCols, + String qSourceTableName, String qTargetTableName, long scrutinyTimeMillis) + throws SQLException { + PreparedStatement ps = conn.prepareStatement(constructMetadataParamQuery(selectCols)); + ps.setString(1, qSourceTableName); + ps.setString(2, qTargetTableName); + ps.setLong(3, scrutinyTimeMillis); + return ps.executeQuery(); + } + + /** + * Query the metadata table for all columns + * @param conn connection to use + * @param qSourceTableName source table full name + * @param qTargetTableName target table full name + * @param scrutinyTimeMillis time when scrutiny was run + * @return + * @throws SQLException + */ + public static ResultSet queryAllMetadata(Connection conn, String qSourceTableName, + String qTargetTableName, long scrutinyTimeMillis) throws SQLException { + PTable pMetadata = PhoenixRuntime.getTable(conn, OUTPUT_METADATA_TABLE_NAME); + List<String> metadataCols = SchemaUtil.getColumnNames(pMetadata.getColumns()); + return queryMetadata(conn, metadataCols, qSourceTableName, qTargetTableName, + scrutinyTimeMillis); + } + + /** + * Writes the results of the given jobs to the metadata table + * @param conn connection to use + * @param cmdLineArgs arguments the {@code IndexScrutinyTool} was run with + * @param completedJobs completed MR jobs + * @throws IOException + * @throws SQLException + */ + public static void writeJobResults(Connection conn, String[] cmdLineArgs, List<Job> completedJobs) throws IOException, SQLException { + PreparedStatement pStmt = conn.prepareStatement(UPSERT_METADATA_SQL); + for (Job job : completedJobs) { + Configuration conf = job.getConfiguration(); + String qDataTable = PhoenixConfigurationUtil.getScrutinyDataTableName(conf); + final PTable pdataTable = PhoenixRuntime.getTable(conn, qDataTable); + final String qIndexTable = PhoenixConfigurationUtil.getScrutinyIndexTableName(conf); + final PTable pindexTable = PhoenixRuntime.getTable(conn, qIndexTable); + SourceTable sourceTable = PhoenixConfigurationUtil.getScrutinySourceTable(conf); + long scrutinyExecuteTime = + PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(conf); + SourceTargetColumnNames columnNames = + SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) + ? new DataSourceColNames(pdataTable, + pindexTable) + : new IndexSourceColNames(pdataTable, + pindexTable); + + Counters counters = job.getCounters(); + int index = 1; + pStmt.setString(index++, columnNames.getQualifiedSourceTableName()); + pStmt.setString(index++, columnNames.getQualifiedTargetTableName()); + pStmt.setLong(index++, scrutinyExecuteTime); + pStmt.setString(index++, sourceTable.name()); + pStmt.setString(index++, Arrays.toString(cmdLineArgs)); + pStmt.setLong(index++, counters.findCounter(PhoenixJobCounters.INPUT_RECORDS).getValue()); + pStmt.setLong(index++, counters.findCounter(PhoenixJobCounters.FAILED_RECORDS).getValue()); + pStmt.setLong(index++, counters.findCounter(PhoenixScrutinyJobCounters.VALID_ROW_COUNT).getValue()); + pStmt.setLong(index++, counters.findCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).getValue()); + pStmt.setLong(index++, counters.findCounter(PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT).getValue()); + pStmt.setLong(index++, counters.findCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).getValue()); + pStmt.setString(index++, Arrays.toString(columnNames.getSourceDynamicCols().toArray())); + pStmt.setString(index++, Arrays.toString(columnNames.getTargetDynamicCols().toArray())); + pStmt.setString(index++, getSqlQueryAllInvalidRows(conn, columnNames, scrutinyExecuteTime)); + pStmt.setString(index++, getSqlQueryMissingTargetRows(conn, columnNames, scrutinyExecuteTime)); + pStmt.setString(index++, getSqlQueryBadCoveredColVal(conn, columnNames, scrutinyExecuteTime)); + pStmt.addBatch(); + } + pStmt.executeBatch(); + conn.commit(); + } + + /** + * Get the parameterized query to return all the invalid rows from a scrutiny job + */ + static String constructMetadataParamQuery(List<String> metadataSelectCols) { + String pkColsCsv = getPksCsv(); + String query = + QueryUtil.constructSelectStatement(OUTPUT_METADATA_TABLE_NAME, metadataSelectCols, + pkColsCsv, null, true); + String inClause = " IN " + QueryUtil.constructParameterizedInClause(3, 1); + return query + inClause; + } + + private static String getAllInvalidParamQuery(Connection conn, + SourceTargetColumnNames columnNames) throws SQLException { + String whereQuery = constructOutputTableQuery(conn, columnNames, getPksCsv()); + String inClause = " IN " + QueryUtil.constructParameterizedInClause(getPkCols().size(), 1); + String paramQuery = whereQuery + inClause; + return paramQuery; + } + + private static String bindPkCols(SourceTargetColumnNames columnNames, long scrutinyTimeMillis, + String paramQuery) { + paramQuery = + paramQuery.replaceFirst("\\?", + "'" + columnNames.getQualifiedSourceTableName() + "'"); + paramQuery = + paramQuery.replaceFirst("\\?", + "'" + columnNames.getQualifiedTargetTableName() + "'"); + paramQuery = paramQuery.replaceFirst("\\?", scrutinyTimeMillis + ""); + return paramQuery; + } + + private static String getHasTargetRowQuery(Connection conn, SourceTargetColumnNames columnNames, + long scrutinyTimeMillis) throws SQLException { + String whereQuery = + constructOutputTableQuery(conn, columnNames, + getPksCsv() + ", " + SchemaUtil.getEscapedFullColumnName("HAS_TARGET_ROW")); + String inClause = + " IN " + QueryUtil.constructParameterizedInClause(getPkCols().size() + 1, 1); + String paramQuery = whereQuery + inClause; + paramQuery = bindPkCols(columnNames, scrutinyTimeMillis, paramQuery); + return paramQuery; + } + + private static String getPksCsv() { + String pkColsCsv = Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(getPkCols())); + return pkColsCsv; + } + + private static List<String> getPkCols() { + return Arrays.asList(SOURCE_TABLE_COL_NAME, TARGET_TABLE_COL_NAME, + SCRUTINY_EXECUTE_TIME_COL_NAME); + } + + private static String constructOutputTableQuery(Connection connection, + SourceTargetColumnNames columnNames, String conditions) throws SQLException { + PTable pOutputTable = PhoenixRuntime.getTable(connection, OUTPUT_TABLE_NAME); + List<String> outputTableColumns = SchemaUtil.getColumnNames(pOutputTable.getColumns()); + List<String> selectCols = + Lists.newArrayList( + Iterables.concat(outputTableColumns, columnNames.getUnqualifiedSourceColNames(), + columnNames.getUnqualifiedTargetColNames())); + String dynamicCols = + Joiner.on(",").join(Iterables.concat(columnNames.getSourceDynamicCols(), + columnNames.getTargetDynamicCols())); + // dynamic defined after the table name + // https://phoenix.apache.org/dynamic_columns.html + String dynamicTableName = OUTPUT_TABLE_NAME + "(" + dynamicCols + ")"; + return QueryUtil.constructSelectStatement(dynamicTableName, selectCols, conditions, null, true); + } + + private static List<String> getOutputTableColumns(Connection connection) throws SQLException { + PTable pOutputTable = + PhoenixRuntime.getTable(connection, IndexScrutinyTableOutput.OUTPUT_TABLE_NAME); + List<String> outputTableColumns = SchemaUtil.getColumnNames(pOutputTable.getColumns()); + return outputTableColumns; + } + +}