PHOENIX-2460 Implement scrutiny command to validate whether
 or not an index is in sync with the data table (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/08e2a29f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/08e2a29f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/08e2a29f

Branch: refs/heads/4.x-HBase-0.98
Commit: 08e2a29f60bcedd107f96fc50cfeb8996296d56f
Parents: ce6810f
Author: James Taylor <jtay...@salesforce.com>
Authored: Mon Aug 28 15:32:20 2017 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Mon Aug 28 15:40:56 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/IndexScrutinyToolIT.java    | 612 +++++++++++++++++++
 .../mapreduce/index/IndexScrutinyMapper.java    | 377 ++++++++++++
 .../index/IndexScrutinyTableOutput.java         | 345 +++++++++++
 .../mapreduce/index/IndexScrutinyTool.java      | 522 ++++++++++++++++
 .../mapreduce/index/PhoenixIndexDBWritable.java |  11 +-
 .../index/PhoenixScrutinyJobCounters.java       |  41 ++
 .../index/SourceTargetColumnNames.java          | 195 ++++++
 .../mapreduce/util/IndexColumnNames.java        | 239 ++++++++
 .../util/PhoenixConfigurationUtil.java          | 128 +++-
 .../java/org/apache/phoenix/util/QueryUtil.java |  63 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |  21 +
 .../phoenix/mapreduce/index/BaseIndexTest.java  |  97 +++
 .../index/IndexScrutinyTableOutputTest.java     |  87 +++
 .../mapreduce/util/IndexColumnNamesTest.java    |  74 +++
 .../org/apache/phoenix/util/QueryUtilTest.java  |  18 +-
 15 files changed, 2817 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
new file mode 100644
index 0000000..5068610
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.phoenix.end2end;
+
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.INVALID_ROW_COUNT;
+import static 
org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters.VALID_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTableOutput;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.index.PhoenixScrutinyJobCounters;
+import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Tests for the {@link IndexScrutinyTool}
+ */
+public class IndexScrutinyToolIT extends BaseTest {
+
+    private static final String DATA_TABLE_DDL =
+            "CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, 
ZIP INTEGER, EMPLOY_DATE TIMESTAMP, EMPLOYER VARCHAR)";
+
+    private static final String INDEX_TABLE_DDL = "CREATE INDEX %s ON %s 
(NAME, EMPLOY_DATE) INCLUDE (ZIP)";
+
+    private static final String UPSERT_SQL = "UPSERT INTO %s VALUES(?,?,?,?)";
+
+    private static final String INDEX_UPSERT_SQL = "UPSERT INTO %s 
(\"0:NAME\", \":ID\", \"0:ZIP\", \"0:EMPLOY_DATE\") values (?,?,?,?)";
+
+    private static final String DELETE_SQL = "DELETE FROM %s ";
+
+    private String schemaName;
+    private String dataTableName;
+    private String dataTableFullName;
+    private String indexTableName;
+    private String indexTableFullName;
+    private String outputDir;
+
+    private Connection conn;
+
+    private PreparedStatement dataTableUpsertStmt;
+
+    private PreparedStatement indexTableUpsertStmt;
+
+    private long testTime;
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMap();
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    /**
+     * Create the test data and index tables
+     */
+    @Before
+    public void setup() throws SQLException {
+        generateUniqueTableNames();
+        createTestTable(getUrl(), String.format(DATA_TABLE_DDL, 
dataTableFullName));
+        createTestTable(getUrl(),
+            String.format(INDEX_TABLE_DDL, indexTableName, dataTableFullName));
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        String dataTableUpsert = String.format(UPSERT_SQL, dataTableFullName);
+        dataTableUpsertStmt = conn.prepareStatement(dataTableUpsert);
+        String indexTableUpsert = String.format(INDEX_UPSERT_SQL, 
indexTableFullName);
+        indexTableUpsertStmt = conn.prepareStatement(indexTableUpsert);
+        conn.setAutoCommit(false);
+        testTime = System.currentTimeMillis();
+    }
+
+    @After
+    public void teardown() throws SQLException {
+        if (conn != null) {
+            conn.close();
+        }
+    }
+
+    /**
+     * Tests a data table that is correctly indexed. Scrutiny should report 
all rows as valid.
+     */
+    @Test
+    public void testValidIndex() throws Exception {
+        // insert two rows
+        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+        conn.commit();
+
+        int numDataRows = countRows(dataTableFullName);
+        int numIndexRows = countRows(indexTableFullName);
+
+        // scrutiny should report everything as ok
+        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
+        Job job = completedJobs.get(0);
+        assertTrue(job.isSuccessful());
+        Counters counters = job.getCounters();
+        assertEquals(2, getCounterValue(counters, VALID_ROW_COUNT));
+        assertEquals(0, getCounterValue(counters, INVALID_ROW_COUNT));
+
+        // make sure row counts weren't modified by scrutiny
+        assertEquals(numDataRows, countRows(dataTableFullName));
+        assertEquals(numIndexRows, countRows(indexTableFullName));
+    }
+
+    /**
+     * Tests an index with the same # of rows as the data table, but one of 
the index rows is
+     * incorrect Scrutiny should report the invalid rows.
+     */
+    @Test
+    public void testEqualRowCountIndexIncorrect() throws Exception {
+        // insert one valid row
+        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+        conn.commit();
+
+        // disable the index and insert another row which is not indexed
+        disableIndex();
+        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+        conn.commit();
+
+        // insert a bad row into the index
+        upsertIndexRow("badName", 2, 9999);
+        conn.commit();
+
+        // scrutiny should report the bad row
+        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
+        Job job = completedJobs.get(0);
+        assertTrue(job.isSuccessful());
+        Counters counters = job.getCounters();
+        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+        assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+    }
+
+    /**
+     * Tests an index where the index pk is correct (indexed col values are 
indexed correctly), but
+     * a covered index value is incorrect. Scrutiny should report the invalid 
row
+     */
+    @Test
+    public void testCoveredValueIncorrect() throws Exception {
+        // insert one valid row
+        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+        conn.commit();
+
+        // disable index and insert another data row
+        disableIndex();
+        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+        conn.commit();
+
+        // insert a bad index row for the above data row
+        upsertIndexRow("name-2", 2, 9999);
+        conn.commit();
+
+        // scrutiny should report the bad row
+        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
+        Job job = completedJobs.get(0);
+        assertTrue(job.isSuccessful());
+        Counters counters = job.getCounters();
+        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+        assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+        assertEquals(1, getCounterValue(counters, BAD_COVERED_COL_VAL_COUNT));
+    }
+
+    /**
+     * Test batching of row comparisons Inserts 1001 rows, with some random 
bad rows, and runs
+     * scrutiny with batchsize of 10,
+     */
+    @Test
+    public void testBatching() throws Exception {
+        // insert 1001 data and index rows
+        int numTestRows = 1001;
+        for (int i = 0; i < numTestRows; i++) {
+            upsertRow(dataTableUpsertStmt, i, "name-" + i, i + 1000);
+        }
+        conn.commit();
+
+        disableIndex();
+
+        // randomly delete some rows from the index
+        Random random = new Random();
+        for (int i = 0; i < 100; i++) {
+            int idToDelete = random.nextInt(numTestRows);
+            deleteRow(indexTableFullName, "WHERE \":ID\"=" + idToDelete);
+        }
+        conn.commit();
+        int numRows = countRows(indexTableFullName);
+        int numDeleted = numTestRows - numRows;
+
+        // run scrutiny with batch size of 10
+        List<Job> completedJobs =
+                runScrutiny(schemaName, dataTableName, indexTableName, 
System.currentTimeMillis(),
+                    10L);
+        Job job = completedJobs.get(0);
+        assertTrue(job.isSuccessful());
+        Counters counters = job.getCounters();
+        assertEquals(numTestRows - numDeleted, getCounterValue(counters, 
VALID_ROW_COUNT));
+        assertEquals(numDeleted, getCounterValue(counters, INVALID_ROW_COUNT));
+        assertEquals(numTestRows / 10 + numTestRows % 10,
+            getCounterValue(counters, BATCHES_PROCESSED_COUNT));
+    }
+
+    /**
+     * Tests when there are more data table rows than index table rows 
Scrutiny should report the
+     * number of incorrect rows
+     */
+    @Test
+    public void testMoreDataRows() throws Exception {
+        upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+        conn.commit();
+        disableIndex();
+        // these rows won't have a corresponding index row
+        upsertRow(dataTableUpsertStmt, 2, "name-2", 95124);
+        upsertRow(dataTableUpsertStmt, 3, "name-3", 95125);
+        conn.commit();
+
+        List<Job> completedJobs = runScrutiny(schemaName, dataTableName, 
indexTableName);
+        Job job = completedJobs.get(0);
+        assertTrue(job.isSuccessful());
+        Counters counters = job.getCounters();
+        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+        assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+    }
+
+    /**
+     * Tests when there are more index table rows than data table rows 
Scrutiny should report the
+     * number of incorrect rows when run with the index as the source table
+     */
+    @Test
+    public void testMoreIndexRows() throws Exception {
+        upsertRow(dataTableUpsertStmt, 1, "name-1", 95123);
+        conn.commit();
+        disableIndex();
+        // these index rows won't have a corresponding data row
+        upsertIndexRow("name-2", 2, 95124);
+        upsertIndexRow("name-3", 3, 95125);
+        conn.commit();
+
+        List<Job> completedJobs =
+                runScrutiny(schemaName, dataTableName, indexTableName, 
System.currentTimeMillis(),
+                    10L, SourceTable.INDEX_TABLE_SOURCE);
+        Job job = completedJobs.get(0);
+        assertTrue(job.isSuccessful());
+        Counters counters = job.getCounters();
+        assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+        assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT));
+    }
+
+    /**
+     * Tests running with both the index and data tables as the source table 
If we have an
+     * incorrectly indexed row, it should be reported in each direction
+     */
+    @Test
+    public void testBothDataAndIndexAsSource() throws Exception {
+        // insert one valid row
+        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+        conn.commit();
+
+        // disable the index and insert another row which is not indexed
+        disableIndex();
+        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+        conn.commit();
+
+        // insert a bad row into the index
+        upsertIndexRow("badName", 2, 9999);
+        conn.commit();
+
+        List<Job> completedJobs =
+                runScrutiny(schemaName, dataTableName, indexTableName, 
System.currentTimeMillis(),
+                    10L, SourceTable.BOTH);
+        assertEquals(2, completedJobs.size());
+        for (Job job : completedJobs) {
+            assertTrue(job.isSuccessful());
+            Counters counters = job.getCounters();
+            assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT));
+            assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT));
+        }
+    }
+
+    /**
+     * Tests that with the output to file option set, the scrutiny tool 
outputs invalid rows to file
+     */
+    @Test
+    public void testOutputInvalidRowsToFile() throws Exception {
+        insertOneValid_OneBadVal_OneMissingTarget();
+
+        String[] argValues =
+                getArgValues(schemaName, dataTableName, indexTableName, 
System.currentTimeMillis(),
+                    10L, SourceTable.DATA_TABLE_SOURCE, true, 
OutputFormat.FILE, null);
+        runScrutiny(argValues);
+
+        // check the output files
+        Path outputPath = CsvBulkImportUtil.getOutputPath(new Path(outputDir), 
dataTableFullName);
+        DistributedFileSystem fs = 
getUtility().getDFSCluster().getFileSystem();
+        Path outputFilePath = new Path(outputPath, "part-m-00000");
+        assertTrue(fs.exists(outputFilePath));
+        FSDataInputStream fsDataInputStream = fs.open(outputFilePath);
+        BufferedReader reader = new BufferedReader(new 
InputStreamReader(fsDataInputStream));
+        try {
+            assertEquals("[2, name-2, " + new Timestamp(testTime).toString() + 
", 95123]\t[2, name-2, " + new Timestamp(testTime).toString() + ", 9999]", 
reader.readLine());
+            assertEquals("[3, name-3, " + new Timestamp(testTime).toString() + 
", 95123]\tTarget row not found", reader.readLine());
+        } finally {
+            reader.close();
+            fsDataInputStream.close();
+        }
+    }
+
+    /**
+     * Tests writing of results to the output table
+     */
+    @Test
+    public void testOutputInvalidRowsToTable() throws Exception {
+        insertOneValid_OneBadVal_OneMissingTarget();
+        String[] argValues =
+                getArgValues(schemaName, dataTableName, indexTableName, 
System.currentTimeMillis(),
+                    10L, SourceTable.DATA_TABLE_SOURCE, true, 
OutputFormat.TABLE, null);
+        List<Job> completedJobs = runScrutiny(argValues);
+
+        // check that the output table contains the invalid rows
+        long scrutinyTimeMillis =
+                PhoenixConfigurationUtil
+                        
.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+        String invalidRowsQuery =
+                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, 
getColNames(),
+                    scrutinyTimeMillis);
+        ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery + 
" ORDER BY ID asc");
+        assertTrue(rs.next());
+        assertEquals(dataTableFullName,
+            rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+        assertEquals(indexTableFullName,
+            rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+        assertTrue(rs.getBoolean("HAS_TARGET_ROW"));
+        assertEquals(2, rs.getInt("ID"));
+        assertEquals(2, rs.getInt(":ID"));
+        assertEquals(95123, rs.getInt("ZIP"));
+        assertEquals(9999, rs.getInt("0:ZIP")); // covered col zip incorrect
+        assertTrue(rs.next());
+        assertEquals(dataTableFullName,
+            rs.getString(IndexScrutinyTableOutput.SOURCE_TABLE_COL_NAME));
+        assertEquals(indexTableFullName,
+            rs.getString(IndexScrutinyTableOutput.TARGET_TABLE_COL_NAME));
+        assertFalse(rs.getBoolean("HAS_TARGET_ROW"));
+        assertEquals(3, rs.getInt("ID"));
+        assertEquals(null, rs.getObject(":ID")); // null for missing target row
+        assertFalse(rs.next());
+
+        // check that the job results were written correctly to the metadata 
table
+        assertMetadataTableValues(argValues, scrutinyTimeMillis, 
invalidRowsQuery);
+    }
+
+    /**
+     * Tests that the config for max number of output rows is observed
+     */
+    @Test
+    public void testMaxOutputRows() throws Exception {
+        insertOneValid_OneBadVal_OneMissingTarget();
+        // set max to 1.  There are two bad rows, but only 1 should get 
written to output table
+        String[] argValues =
+                getArgValues(schemaName, dataTableName, indexTableName, 
System.currentTimeMillis(),
+                    10L, SourceTable.DATA_TABLE_SOURCE, true, 
OutputFormat.TABLE, new Long(1));
+        List<Job> completedJobs = runScrutiny(argValues);
+        long scrutinyTimeMillis =
+                PhoenixConfigurationUtil
+                        
.getScrutinyExecuteTimestamp(completedJobs.get(0).getConfiguration());
+        String invalidRowsQuery =
+                IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, 
getColNames(),
+                    scrutinyTimeMillis);
+        ResultSet rs = conn.createStatement().executeQuery(invalidRowsQuery);
+        assertTrue(rs.next());
+        assertFalse(rs.next());
+    }
+
+    private SourceTargetColumnNames getColNames() throws SQLException {
+        PTable pdataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+        PTable pindexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+        SourceTargetColumnNames columnNames =
+                new SourceTargetColumnNames.DataSourceColNames(pdataTable, 
pindexTable);
+        return columnNames;
+    }
+
+    // inserts one valid data/index row, one data row with a missing index row,
+    // and one data row with an index row that has a bad covered col val
+    private void insertOneValid_OneBadVal_OneMissingTarget() throws 
SQLException {
+        // insert one valid row
+        upsertRow(dataTableUpsertStmt, 1, "name-1", 94010);
+        conn.commit();
+
+        // disable the index and insert another row which is not indexed
+        disableIndex();
+        upsertRow(dataTableUpsertStmt, 2, "name-2", 95123);
+        upsertRow(dataTableUpsertStmt, 3, "name-3", 95123);
+        conn.commit();
+
+        // insert a bad index row for one of the above data rows
+        upsertIndexRow("name-2", 2, 9999);
+        conn.commit();
+    }
+
+    private void assertMetadataTableValues(String[] argValues, long 
scrutinyTimeMillis,
+            String invalidRowsQuery) throws SQLException {
+        ResultSet rs;
+        ResultSet metadataRs =
+                IndexScrutinyTableOutput.queryAllMetadata(conn, 
dataTableFullName,
+                    indexTableFullName, scrutinyTimeMillis);
+        assertTrue(metadataRs.next());
+        List<? extends Object> expected =
+                Arrays.asList(dataTableFullName, indexTableFullName, 
scrutinyTimeMillis,
+                    SourceTable.DATA_TABLE_SOURCE.name(), 
Arrays.toString(argValues), 3L, 0L, 1L,
+                    2L, 1L, 1L, "[\"ID\" INTEGER, \"NAME\" VARCHAR, 
\"EMPLOY_DATE\" TIMESTAMP, \"ZIP\" INTEGER]",
+                    "[\":ID\" INTEGER, \"0:NAME\" VARCHAR, \"0:EMPLOY_DATE\" 
DECIMAL, \"0:ZIP\" INTEGER]", invalidRowsQuery);
+        assertRsValues(metadataRs, expected);
+        String missingTargetQuery = 
metadataRs.getString("INVALID_ROWS_QUERY_MISSING_TARGET");
+        rs = conn.createStatement().executeQuery(missingTargetQuery);
+        assertTrue(rs.next());
+        assertEquals(3, rs.getInt("ID"));
+        assertFalse(rs.next());
+        String badCoveredColQuery = 
metadataRs.getString("INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL");
+        rs = conn.createStatement().executeQuery(badCoveredColQuery);
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt("ID"));
+        assertFalse(rs.next());
+    }
+
+    // assert the result set contains the expected values in the given order
+    private void assertRsValues(ResultSet rs, List<? extends Object> expected) 
throws SQLException {
+        for (int i = 0; i < expected.size(); i++) {
+            assertEquals(expected.get(i), rs.getObject(i + 1));
+        }
+    }
+
+    private void generateUniqueTableNames() {
+        schemaName = generateUniqueName();
+        dataTableName = generateUniqueName();
+        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        indexTableName = generateUniqueName();
+        indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+    }
+
+    private int countRows(String tableFullName) throws SQLException {
+        ResultSet count =
+                conn.createStatement().executeQuery("select count(*) from " + 
tableFullName);
+        count.next();
+        int numRows = count.getInt(1);
+        return numRows;
+    }
+
+    private void upsertIndexRow(String name, int id, int zip) throws 
SQLException {
+        indexTableUpsertStmt.setString(1, name);
+        indexTableUpsertStmt.setInt(2, id); // id
+        indexTableUpsertStmt.setInt(3, zip); // bad zip
+        indexTableUpsertStmt.setTimestamp(4, new Timestamp(testTime));
+        indexTableUpsertStmt.executeUpdate();
+    }
+
+    private void disableIndex() throws SQLException {
+        conn.createStatement().execute(
+            String.format("ALTER INDEX %s ON %S disable", indexTableName, 
dataTableFullName));
+        conn.commit();
+    }
+
+    private long getCounterValue(Counters counters, 
Enum<PhoenixScrutinyJobCounters> counter) {
+        return counters.findCounter(counter).getValue();
+    }
+
+    private String[] getArgValues(String schemaName, String dataTable, String 
indxTable, long ts,
+            Long batchSize, SourceTable sourceTable, boolean outputInvalidRows,
+            OutputFormat outputFormat, Long maxOutputRows) {
+        final List<String> args = Lists.newArrayList();
+        if (schemaName != null) {
+            args.add("-s");
+            args.add(schemaName);
+        }
+        args.add("-dt");
+        args.add(dataTable);
+        args.add("-it");
+        args.add(indxTable);
+
+        // TODO test snapshot reads
+        // if(useSnapshot) {
+        // args.add("-snap");
+        // }
+
+        if (OutputFormat.FILE.equals(outputFormat)) {
+            args.add("-op");
+            outputDir = "/tmp/" + UUID.randomUUID().toString();
+            args.add(outputDir);
+        }
+        args.add("-t");
+        args.add(String.valueOf(ts));
+        args.add("-run-foreground");
+        if (batchSize != null) {
+            args.add("-b");
+            args.add(String.valueOf(batchSize));
+        }
+
+        // default to using data table as the source table
+        args.add("-src");
+        if (sourceTable == null) {
+            args.add(SourceTable.DATA_TABLE_SOURCE.name());
+        } else {
+            args.add(sourceTable.name());
+        }
+        if (outputInvalidRows) {
+            args.add("-o");
+        }
+        if (outputFormat != null) {
+            args.add("-of");
+            args.add(outputFormat.name());
+        }
+        if (maxOutputRows != null) {
+            args.add("-om");
+            args.add(maxOutputRows.toString());
+        }
+        return args.toArray(new String[0]);
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName)
+            throws Exception {
+        return runScrutiny(schemaName, dataTableName, indexTableName, 
System.currentTimeMillis());
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName,
+            long ts) throws Exception {
+        return runScrutiny(schemaName, dataTableName, indexTableName, ts, 
null, null);
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName,
+            long ts, Long batchSize) throws Exception {
+        return runScrutiny(schemaName, dataTableName, indexTableName, ts, 
batchSize, null);
+    }
+
+    private List<Job> runScrutiny(String schemaName, String dataTableName, 
String indexTableName,
+            long ts, Long batchSize, SourceTable sourceTable) throws Exception 
{
+        final String[] cmdArgs =
+                getArgValues(schemaName, dataTableName, indexTableName, ts, 
batchSize, sourceTable,
+                    false, null, null);
+        return runScrutiny(cmdArgs);
+    }
+
+    private List<Job> runScrutiny(String[] cmdArgs) throws Exception {
+        IndexScrutinyTool scrutiny = new IndexScrutinyTool();
+        Configuration conf = new 
Configuration(getUtility().getConfiguration());
+        scrutiny.setConf(conf);
+        int status = scrutiny.run(cmdArgs);
+        assertEquals(0, status);
+        return scrutiny.getJobs();
+    }
+
+    private void upsertRow(PreparedStatement stmt, int id, String name, int 
zip)
+            throws SQLException {
+        int index = 1;
+        // insert row
+        stmt.setInt(index++, id);
+        stmt.setString(index++, name);
+        stmt.setInt(index++, zip);
+        stmt.setTimestamp(index++, new Timestamp(testTime));
+        stmt.executeUpdate();
+    }
+
+    private int deleteRow(String fullTableName, String whereCondition) throws 
SQLException {
+        String deleteSql = String.format(DELETE_SQL, indexTableFullName) + 
whereCondition;
+        PreparedStatement deleteStmt = conn.prepareStatement(deleteSql);
+        return deleteStmt.executeUpdate();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
new file mode 100644
index 0000000..81081bf
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+/**
+ * Mapper that reads from the data table and checks the rows against the index 
table
+ */
+public class IndexScrutinyMapper extends Mapper<NullWritable, 
PhoenixIndexDBWritable, Text, Text> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IndexScrutinyMapper.class);
+    private Connection connection;
+    private List<ColumnInfo> targetTblColumnMetadata;
+    private long batchSize;
+    // holds a batch of rows from the table the mapper is iterating over
+    // Each row is a pair - the row TS, and the row values
+    private List<Pair<Long, List<Object>>> currentBatchValues = new 
ArrayList<>();
+    private String targetTableQuery;
+    private int numTargetPkCols;
+    private boolean outputInvalidRows;
+    private OutputFormat outputFormat = OutputFormat.FILE;
+    private String qSourceTable;
+    private String qTargetTable;
+    private long executeTimestamp;
+    private int numSourcePkCols;
+    private final PhoenixIndexDBWritable indxWritable = new 
PhoenixIndexDBWritable();
+    private List<ColumnInfo> sourceTblColumnMetadata;
+
+    // used to write results to the output table
+    private Connection outputConn;
+    private PreparedStatement outputUpsertStmt;
+    private long outputMaxRows;
+    private MessageDigest md5;
+
+    @Override
+    protected void setup(final Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        final Configuration configuration = context.getConfiguration();
+        try {
+            // get a connection with correct CURRENT_SCN (so incoming writes 
don't throw off the
+            // scrutiny)
+            final Properties overrideProps = new Properties();
+            String scn = 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+            connection = ConnectionUtil.getOutputConnection(configuration, 
overrideProps);
+            connection.setAutoCommit(false);
+            batchSize = 
PhoenixConfigurationUtil.getScrutinyBatchSize(configuration);
+            outputInvalidRows =
+                    
PhoenixConfigurationUtil.getScrutinyOutputInvalidRows(configuration);
+            outputFormat = 
PhoenixConfigurationUtil.getScrutinyOutputFormat(configuration);
+            executeTimestamp = 
PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(configuration);
+
+            // get the index table and column names
+            String qDataTable = 
PhoenixConfigurationUtil.getScrutinyDataTableName(configuration);
+            final PTable pdataTable = PhoenixRuntime.getTable(connection, 
qDataTable);
+            final String qIndexTable =
+                    
PhoenixConfigurationUtil.getScrutinyIndexTableName(configuration);
+            final PTable pindexTable = PhoenixRuntime.getTable(connection, 
qIndexTable);
+
+            // set the target table based on whether we're running the MR over 
the data or index
+            // table
+            SourceTable sourceTable =
+                    
PhoenixConfigurationUtil.getScrutinySourceTable(configuration);
+            SourceTargetColumnNames columnNames =
+                    SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
+                            ? new 
SourceTargetColumnNames.DataSourceColNames(pdataTable,
+                                    pindexTable)
+                            : new 
SourceTargetColumnNames.IndexSourceColNames(pdataTable,
+                                    pindexTable);
+            qSourceTable = columnNames.getQualifiedSourceTableName();
+            qTargetTable = columnNames.getQualifiedTargetTableName();
+            List<String> targetColNames = columnNames.getTargetColNames();
+            List<String> sourceColNames = columnNames.getSourceColNames();
+            List<String> targetPkColNames = columnNames.getTargetPkColNames();
+            String targetPksCsv =
+                    
Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(targetPkColNames));
+            numSourcePkCols = columnNames.getSourcePkColNames().size();
+            numTargetPkCols = targetPkColNames.size();
+
+            if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
+                outputConn = ConnectionUtil.getOutputConnection(configuration, 
new Properties());
+                String upsertQuery = 
PhoenixConfigurationUtil.getUpsertStatement(configuration);
+                this.outputUpsertStmt = 
outputConn.prepareStatement(upsertQuery);
+            }
+            outputMaxRows = 
PhoenixConfigurationUtil.getScrutinyOutputMax(configuration);
+
+            // Create the query against the target table
+            // Our query projection should be all the index column names (or 
their data table
+            // equivalent
+            // name)
+            targetTableQuery =
+                    QueryUtil.constructSelectStatement(qTargetTable, 
columnNames.getCastedTargetColNames(), targetPksCsv,
+                        Hint.NO_INDEX, false) + " IN ";
+            targetTblColumnMetadata =
+                    PhoenixRuntime.generateColumnInfo(connection, 
qTargetTable, targetColNames);
+            sourceTblColumnMetadata =
+                    PhoenixRuntime.generateColumnInfo(connection, 
qSourceTable, sourceColNames);
+            LOG.info("Target table base query: " + targetTableQuery);
+            md5 = MessageDigest.getInstance("MD5");
+        } catch (SQLException | NoSuchAlgorithmException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void map(NullWritable key, PhoenixIndexDBWritable record, 
Context context)
+            throws IOException, InterruptedException {
+        try {
+            final List<Object> values = record.getValues();
+
+            context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+            currentBatchValues.add(new Pair<>(record.getRowTs(), values));
+            if 
(context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize != 
0) {
+                // if we haven't hit the batch size, just report progress and 
move on to next record
+                context.progress();
+                return;
+            } else {
+                // otherwise, process the batch
+                processBatch(context);
+            }
+            context.progress(); // Make sure progress is reported to 
Application Master.
+        } catch (SQLException | IllegalArgumentException e) {
+            LOG.error(" Error while read/write of a record ", e);
+            context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+        super.cleanup(context);
+        if (connection != null) {
+            try {
+                processBatch(context);
+                connection.close();
+                if (outputConn != null) {
+                    outputConn.close();
+                }
+            } catch (SQLException e) {
+                LOG.error("Error while closing connection in the 
PhoenixIndexMapper class ", e);
+                throw new IOException(e);
+            }
+        }
+    }
+
+    private void processBatch(Context context)
+            throws SQLException, IOException, InterruptedException {
+        if (currentBatchValues.size() == 0) return;
+        
context.getCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).increment(1);
+        // our query selection filter should be the PK columns of the target 
table (index or data
+        // table)
+        String inClause =
+                QueryUtil.constructParameterizedInClause(numTargetPkCols,
+                    currentBatchValues.size());
+        String indexQuery = targetTableQuery + inClause;
+        try (PreparedStatement targetStatement = 
connection.prepareStatement(indexQuery)) {
+            // while we build the PreparedStatement, we also maintain a hash 
of the target table
+            // PKs,
+            // which we use to join against the results of the query on the 
target table
+            Map<String, Pair<Long, List<Object>>> targetPkToSourceValues =
+                    buildTargetStatement(targetStatement);
+
+            // fetch results from the target table and output invalid rows
+            queryTargetTable(context, targetStatement, targetPkToSourceValues);
+
+            // any source values we have left over are invalid (e.g. data 
table rows without
+            // corresponding index row)
+            context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT)
+                    .increment(targetPkToSourceValues.size());
+            if (outputInvalidRows) {
+                for (Pair<Long, List<Object>> sourceRowWithoutTargetRow : 
targetPkToSourceValues.values()) {
+                    List<Object> valuesWithoutTarget = 
sourceRowWithoutTargetRow.getSecond();
+                    if (OutputFormat.FILE.equals(outputFormat)) {
+                        context.write(
+                            new 
Text(Arrays.toString(valuesWithoutTarget.toArray())),
+                            new Text("Target row not found"));
+                    } else if (OutputFormat.TABLE.equals(outputFormat)) {
+                        writeToOutputTable(context, valuesWithoutTarget, null, 
sourceRowWithoutTargetRow.getFirst(), -1L);
+                    }
+                }
+            }
+            if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) {
+                outputUpsertStmt.executeBatch(); // write out invalid rows to 
output table
+                outputConn.commit();
+            }
+            currentBatchValues.clear();
+        }
+    }
+
+    private Map<String, Pair<Long, List<Object>>> 
buildTargetStatement(PreparedStatement targetStatement)
+            throws SQLException {
+        Map<String, Pair<Long, List<Object>>> targetPkToSourceValues =
+                new HashMap<>(currentBatchValues.size());
+        int rsIndex = 1;
+        for (Pair<Long, List<Object>> batchTsRow : currentBatchValues) {
+            List<Object> batchRow = batchTsRow.getSecond();
+            // our original query against the source table (which provided the 
batchRow) projected
+            // with the data table PK cols first, so the first numTargetPkCols 
form the PK
+            String targetPkHash = getPkHash(batchRow.subList(0, 
numTargetPkCols));
+            targetPkToSourceValues.put(targetPkHash, batchTsRow);
+            for (int i = 0; i < numTargetPkCols; i++) {
+                ColumnInfo targetPkInfo = targetTblColumnMetadata.get(i);
+                Object value = batchRow.get(i);
+                if (value == null) {
+                    targetStatement.setNull(rsIndex++, 
targetPkInfo.getSqlType());
+                } else {
+                    targetStatement.setObject(rsIndex++, value, 
targetPkInfo.getSqlType());
+                }
+            }
+        }
+        return targetPkToSourceValues;
+    }
+
+    private void queryTargetTable(Context context, PreparedStatement 
targetStatement,
+            Map<String, Pair<Long, List<Object>>> targetPkToSourceValues)
+            throws SQLException, IOException, InterruptedException {
+        ResultSet targetResultSet = targetStatement.executeQuery();
+
+        while (targetResultSet.next()) {
+            indxWritable.readFields(targetResultSet);
+            List<Object> targetValues = indxWritable.getValues();
+            // first grab the PK and try to join against the source input
+            // the query is such that first numTargetPkCols of the resultSet 
is the PK
+            List<Object> pkObjects = new ArrayList<>(numTargetPkCols);
+            for (int i = 0; i < numTargetPkCols; i++) {
+                Object pkPart = targetResultSet.getObject(i + 1);
+                pkObjects.add(pkPart);
+            }
+            Long targetTS = 
targetResultSet.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp();
+            String targetPk = getPkHash(pkObjects);
+
+            // use the pk to fetch the source table column values
+            Pair<Long, List<Object>> sourceTsValues = 
targetPkToSourceValues.get(targetPk);
+
+            Long sourceTS = sourceTsValues.getFirst();
+            List<Object> sourceValues = sourceTsValues.getSecond();
+            // compare values starting after the PK (i.e. covered columns)
+            boolean isIndexedCorrectly =
+                    compareValues(numTargetPkCols, targetValues, sourceValues, 
context);
+            if (isIndexedCorrectly) {
+                
context.getCounter(PhoenixScrutinyJobCounters.VALID_ROW_COUNT).increment(1);
+            } else {
+                
context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).increment(1);
+                if (outputInvalidRows) {
+                    outputInvalidRow(context, sourceValues, targetValues, 
sourceTS, targetTS);
+                }
+            }
+            targetPkToSourceValues.remove(targetPk);
+        }
+    }
+
+    private void outputInvalidRow(Context context, List<Object> sourceValues,
+            List<Object> targetValues, long sourceTS, long targetTS) throws 
SQLException, IOException, InterruptedException {
+        if (OutputFormat.FILE.equals(outputFormat)) {
+            context.write(new Text(Arrays.toString(sourceValues.toArray())),
+                new Text(Arrays.toString(targetValues.toArray())));
+        } else if (OutputFormat.TABLE.equals(outputFormat)) {
+            writeToOutputTable(context, sourceValues, targetValues, sourceTS, 
targetTS);
+        }
+    }
+
+    // pass in null targetValues if the target row wasn't found
+    private void writeToOutputTable(Context context, List<Object> 
sourceValues, List<Object> targetValues, long sourceTS, long targetTS)
+            throws SQLException {
+        if 
(context.getCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).getValue() > 
outputMaxRows) {
+            return;
+        }
+        int index = 1;
+        outputUpsertStmt.setString(index++, qSourceTable); // SOURCE_TABLE
+        outputUpsertStmt.setString(index++, qTargetTable); // TARGET_TABLE
+        outputUpsertStmt.setLong(index++, executeTimestamp); // 
SCRUTINY_EXECUTE_TIME
+        outputUpsertStmt.setString(index++, getPkHash(sourceValues.subList(0, 
numSourcePkCols))); // SOURCE_ROW_PK_HASH
+        outputUpsertStmt.setLong(index++, sourceTS); // SOURCE_TS
+        outputUpsertStmt.setLong(index++, targetTS); // TARGET_TS
+        outputUpsertStmt.setBoolean(index++, targetValues != null); // 
HAS_TARGET_ROW
+        index = setStatementObjects(sourceValues, index, 
sourceTblColumnMetadata);
+        if (targetValues != null) {
+            index = setStatementObjects(targetValues, index, 
targetTblColumnMetadata);
+        } else { // for case where target row wasn't found, put nulls in 
prepared statement
+            for (int i = 0; i < sourceValues.size(); i++) {
+                outputUpsertStmt.setNull(index++, 
targetTblColumnMetadata.get(i).getSqlType());
+            }
+        }
+        outputUpsertStmt.addBatch();
+    }
+
+    private int setStatementObjects(List<Object> values, int index, 
List<ColumnInfo> colMetadata)
+            throws SQLException {
+        for (int i = 0; i < values.size(); i++) {
+            Object value = values.get(i);
+            ColumnInfo colInfo = colMetadata.get(i);
+            if (value != null) {
+                outputUpsertStmt.setObject(index++, value, 
colInfo.getSqlType());
+            } else {
+                outputUpsertStmt.setNull(index++, colInfo.getSqlType());
+            }
+        }
+        return index;
+    }
+
+    private boolean compareValues(int startIndex, List<Object> targetValues,
+            List<Object> sourceValues, Context context) throws SQLException {
+        if (targetValues == null || sourceValues == null) return false;
+        for (int i = startIndex; i < sourceValues.size(); i++) {
+            Object targetValue = targetValues.get(i);
+            Object sourceValue = sourceValues.get(i);
+            if (targetValue != null && !targetValue.equals(sourceValue)) {
+                
context.getCounter(PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT)
+                        .increment(1);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private String getPkHash(List<Object> pkObjects) {
+        try {
+            for (int i = 0; i < pkObjects.size(); i++) {
+                
md5.update(sourceTblColumnMetadata.get(i).getPDataType().toBytes(pkObjects.get(i)));
+            }
+            return Hex.encodeHexString(md5.digest());
+        } finally {
+            md5.reset();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/08e2a29f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
new file mode 100644
index 0000000..411214e
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutput.java
@@ -0,0 +1,345 @@
+/**
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import 
org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
+import 
org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.IndexSourceColNames;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ *
+ * IndexScrutiny MapReduce output table DDL and methods to get queries against 
the output tables
+ *
+ */
+public class IndexScrutinyTableOutput {
+
+    /**
+     * This table holds the invalid rows in the source table (either missing a 
target, or a bad
+     * covered column value). Dynamic columns hold the original source and 
target table column data.
+     */
+    public static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_SCRUTINY";
+    public static final String SCRUTINY_EXECUTE_TIME_COL_NAME = 
"SCRUTINY_EXECUTE_TIME";
+    public static final String TARGET_TABLE_COL_NAME = "TARGET_TABLE";
+    public static final String SOURCE_TABLE_COL_NAME = "SOURCE_TABLE";
+    public static final String OUTPUT_TABLE_DDL =
+            "CREATE TABLE IF NOT EXISTS " + OUTPUT_TABLE_NAME + "\n" +
+            "(\n" +
+            "    " + SOURCE_TABLE_COL_NAME + " VARCHAR NOT NULL,\n" +
+            "    " + TARGET_TABLE_COL_NAME + " VARCHAR NOT NULL,\n" +
+            "    " + SCRUTINY_EXECUTE_TIME_COL_NAME + " BIGINT NOT NULL,\n" +
+            "    SOURCE_ROW_PK_HASH VARCHAR NOT NULL,\n" +
+            "    SOURCE_TS BIGINT,\n" +
+            "    TARGET_TS BIGINT,\n" +
+            "    HAS_TARGET_ROW BOOLEAN,\n" +
+            "    CONSTRAINT PK PRIMARY KEY\n" +
+            "    (\n" +
+            "        " + SOURCE_TABLE_COL_NAME + ",\n" +
+            "        " + TARGET_TABLE_COL_NAME + ",\n" +
+            "        " + SCRUTINY_EXECUTE_TIME_COL_NAME + ",\n" + // time at 
which the scrutiny ran
+            "        SOURCE_ROW_PK_HASH\n" + //  this hash makes the PK unique
+            "    )\n" + // dynamic columns consisting of the source and target 
columns will follow
+            ")";
+
+    /**
+     * This table holds metadata about a scrutiny job - result counters and 
queries to fetch invalid
+     * row data from the output table. The queries contain the dynamic columns 
which are equivalent
+     * to the original source/target table columns
+     */
+    public static final String OUTPUT_METADATA_TABLE_NAME = 
"PHOENIX_INDEX_SCRUTINY_METADATA";
+    public static final String OUTPUT_METADATA_DDL =
+            "CREATE TABLE IF NOT EXISTS " + OUTPUT_METADATA_TABLE_NAME + "\n" +
+            "(\n" +
+            "    " + SOURCE_TABLE_COL_NAME + " VARCHAR NOT NULL,\n" +
+            "    " + TARGET_TABLE_COL_NAME + " VARCHAR NOT NULL,\n" +
+            "    " + SCRUTINY_EXECUTE_TIME_COL_NAME + " BIGINT NOT NULL,\n" +
+            "    SOURCE_TYPE VARCHAR,\n" + // source is either data or index 
table
+            "    CMD_LINE_ARGS VARCHAR,\n" + // arguments the tool was run with
+            "    INPUT_RECORDS BIGINT,\n" +
+            "    FAILED_RECORDS BIGINT,\n" +
+            "    VALID_ROW_COUNT BIGINT,\n" +
+            "    INVALID_ROW_COUNT BIGINT,\n" +
+            "    INCORRECT_COVERED_COL_VAL_COUNT BIGINT,\n" +
+            "    BATCHES_PROCESSED_COUNT BIGINT,\n" +
+            "    SOURCE_DYNAMIC_COLS VARCHAR,\n" +
+            "    TARGET_DYNAMIC_COLS VARCHAR,\n" +
+            "    INVALID_ROWS_QUERY_ALL VARCHAR,\n" + // stored sql query to 
fetch all the invalid rows from the output table
+            "    INVALID_ROWS_QUERY_MISSING_TARGET VARCHAR,\n" +  // stored 
sql query to fetch all the invalid rows which are missing a target row
+            "    INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL VARCHAR,\n" + // 
stored sql query to fetch all the invalid rows which have bad covered column 
values
+            "    CONSTRAINT PK PRIMARY KEY\n" +
+            "    (\n" +
+            "        " + SOURCE_TABLE_COL_NAME + ",\n" +
+            "        " + TARGET_TABLE_COL_NAME + ",\n" +
+            "        " + SCRUTINY_EXECUTE_TIME_COL_NAME + "\n" +
+            "    )\n" +
+            ")\n";
+
+    public static final String UPSERT_METADATA_SQL = "UPSERT INTO " + 
OUTPUT_METADATA_TABLE_NAME + " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+    /**
+     * Gets the parameterized upsert sql to the output table Used by the 
scrutiny MR job to write
+     * its results
+     * @param sourceDynamicCols list of source columns with their types
+     * @param targetDynamicCols list of target columns with their types
+     * @param connection connection to use
+     * @throws SQLException
+     */
+    public static String constructOutputTableUpsert(List<String> 
sourceDynamicCols,
+            List<String> targetDynamicCols, Connection connection) throws 
SQLException {
+        List<String> outputTableColumns = getOutputTableColumns(connection);
+
+        // construct a dynamic column upsert into the output table
+        List<String> upsertCols =
+                Lists.newArrayList(
+                    Iterables.concat(outputTableColumns, sourceDynamicCols, 
targetDynamicCols));
+        String upsertStmt =
+                
QueryUtil.constructUpsertStatement(IndexScrutinyTableOutput.OUTPUT_TABLE_NAME,
+                    upsertCols, null);
+        return upsertStmt;
+    }
+
+    /**
+     * Get the sql to store as INVALID_ROWS_QUERY_ALL in the output metadata 
table
+     * @param conn
+     * @param columnNames
+     * @param scrutinyTimeMillis
+     * @return
+     * @throws SQLException
+     */
+    public static String getSqlQueryAllInvalidRows(Connection conn,
+            SourceTargetColumnNames columnNames, long scrutinyTimeMillis) 
throws SQLException {
+        String paramQuery = getAllInvalidParamQuery(conn, columnNames);
+        paramQuery = bindPkCols(columnNames, scrutinyTimeMillis, paramQuery);
+        return paramQuery;
+    }
+
+    /**
+     * Get the sql to store as INVALID_ROWS_QUERY_MISSING_TARGET in the output 
metadata table
+     * @param conn
+     * @param columnNames
+     * @param scrutinyTimeMillis
+     * @return
+     * @throws SQLException
+     */
+    public static String getSqlQueryMissingTargetRows(Connection conn,
+            SourceTargetColumnNames columnNames, long scrutinyTimeMillis) 
throws SQLException {
+        String paramQuery = getHasTargetRowQuery(conn, columnNames, 
scrutinyTimeMillis);
+        return paramQuery.replaceFirst("\\?", "false");
+    }
+
+    /**
+     * Get the sql to store as INVALID_ROWS_QUERY_BAD_COVERED_COL_VAL in the 
output metadata table
+     * @param conn
+     * @param columnNames
+     * @param scrutinyTimeMillis
+     * @return
+     * @throws SQLException
+     */
+    public static String getSqlQueryBadCoveredColVal(Connection conn,
+            SourceTargetColumnNames columnNames, long scrutinyTimeMillis) 
throws SQLException {
+        String paramQuery = getHasTargetRowQuery(conn, columnNames, 
scrutinyTimeMillis);
+        return paramQuery.replaceFirst("\\?", "true");
+    }
+
+    /**
+     * Query the metadata table for the given columns
+     * @param conn connection to use
+     * @param selectCols columns to select from the metadata table
+     * @param qSourceTableName source table full name
+     * @param qTargetTableName target table full name
+     * @param scrutinyTimeMillis time when scrutiny was run
+     * @return
+     * @throws SQLException
+     */
+    public static ResultSet queryMetadata(Connection conn, List<String> 
selectCols,
+            String qSourceTableName, String qTargetTableName, long 
scrutinyTimeMillis)
+            throws SQLException {
+        PreparedStatement ps = 
conn.prepareStatement(constructMetadataParamQuery(selectCols));
+        ps.setString(1, qSourceTableName);
+        ps.setString(2, qTargetTableName);
+        ps.setLong(3, scrutinyTimeMillis);
+        return ps.executeQuery();
+    }
+
+    /**
+     * Query the metadata table for all columns
+     * @param conn connection to use
+     * @param qSourceTableName source table full name
+     * @param qTargetTableName target table full name
+     * @param scrutinyTimeMillis time when scrutiny was run
+     * @return
+     * @throws SQLException
+     */
+    public static ResultSet queryAllMetadata(Connection conn, String 
qSourceTableName,
+            String qTargetTableName, long scrutinyTimeMillis) throws 
SQLException {
+        PTable pMetadata = PhoenixRuntime.getTable(conn, 
OUTPUT_METADATA_TABLE_NAME);
+        List<String> metadataCols = 
SchemaUtil.getColumnNames(pMetadata.getColumns());
+        return queryMetadata(conn, metadataCols, qSourceTableName, 
qTargetTableName,
+            scrutinyTimeMillis);
+    }
+
+    /**
+     * Writes the results of the given jobs to the metadata table
+     * @param conn connection to use
+     * @param cmdLineArgs arguments the {@code IndexScrutinyTool} was run with
+     * @param completedJobs completed MR jobs
+     * @throws IOException
+     * @throws SQLException
+     */
+    public static void writeJobResults(Connection conn, String[] cmdLineArgs, 
List<Job> completedJobs) throws IOException, SQLException {
+        PreparedStatement pStmt = conn.prepareStatement(UPSERT_METADATA_SQL);
+        for (Job job : completedJobs) {
+            Configuration conf = job.getConfiguration();
+            String qDataTable = 
PhoenixConfigurationUtil.getScrutinyDataTableName(conf);
+            final PTable pdataTable = PhoenixRuntime.getTable(conn, 
qDataTable);
+            final String qIndexTable = 
PhoenixConfigurationUtil.getScrutinyIndexTableName(conf);
+            final PTable pindexTable = PhoenixRuntime.getTable(conn, 
qIndexTable);
+            SourceTable sourceTable = 
PhoenixConfigurationUtil.getScrutinySourceTable(conf);
+            long scrutinyExecuteTime =
+                    PhoenixConfigurationUtil.getScrutinyExecuteTimestamp(conf);
+            SourceTargetColumnNames columnNames =
+                    SourceTable.DATA_TABLE_SOURCE.equals(sourceTable)
+                            ? new DataSourceColNames(pdataTable,
+                                    pindexTable)
+                            : new IndexSourceColNames(pdataTable,
+                                    pindexTable);
+
+            Counters counters = job.getCounters();
+            int index = 1;
+            pStmt.setString(index++, 
columnNames.getQualifiedSourceTableName());
+            pStmt.setString(index++, 
columnNames.getQualifiedTargetTableName());
+            pStmt.setLong(index++, scrutinyExecuteTime);
+            pStmt.setString(index++, sourceTable.name());
+            pStmt.setString(index++, Arrays.toString(cmdLineArgs));
+            pStmt.setLong(index++, 
counters.findCounter(PhoenixJobCounters.INPUT_RECORDS).getValue());
+            pStmt.setLong(index++, 
counters.findCounter(PhoenixJobCounters.FAILED_RECORDS).getValue());
+            pStmt.setLong(index++, 
counters.findCounter(PhoenixScrutinyJobCounters.VALID_ROW_COUNT).getValue());
+            pStmt.setLong(index++, 
counters.findCounter(PhoenixScrutinyJobCounters.INVALID_ROW_COUNT).getValue());
+            pStmt.setLong(index++, 
counters.findCounter(PhoenixScrutinyJobCounters.BAD_COVERED_COL_VAL_COUNT).getValue());
+            pStmt.setLong(index++, 
counters.findCounter(PhoenixScrutinyJobCounters.BATCHES_PROCESSED_COUNT).getValue());
+            pStmt.setString(index++, 
Arrays.toString(columnNames.getSourceDynamicCols().toArray()));
+            pStmt.setString(index++, 
Arrays.toString(columnNames.getTargetDynamicCols().toArray()));
+            pStmt.setString(index++, getSqlQueryAllInvalidRows(conn, 
columnNames, scrutinyExecuteTime));
+            pStmt.setString(index++, getSqlQueryMissingTargetRows(conn, 
columnNames, scrutinyExecuteTime));
+            pStmt.setString(index++, getSqlQueryBadCoveredColVal(conn, 
columnNames, scrutinyExecuteTime));
+            pStmt.addBatch();
+        }
+        pStmt.executeBatch();
+        conn.commit();
+    }
+
+    /**
+     * Get the parameterized query to return all the invalid rows from a 
scrutiny job
+     */
+    static String constructMetadataParamQuery(List<String> metadataSelectCols) 
{
+        String pkColsCsv = getPksCsv();
+        String query =
+                QueryUtil.constructSelectStatement(OUTPUT_METADATA_TABLE_NAME, 
metadataSelectCols,
+                    pkColsCsv, null, true);
+        String inClause = " IN " + QueryUtil.constructParameterizedInClause(3, 
1);
+        return query + inClause;
+    }
+
+    private static String getAllInvalidParamQuery(Connection conn,
+            SourceTargetColumnNames columnNames) throws SQLException {
+        String whereQuery = constructOutputTableQuery(conn, columnNames, 
getPksCsv());
+        String inClause = " IN " + 
QueryUtil.constructParameterizedInClause(getPkCols().size(), 1);
+        String paramQuery = whereQuery + inClause;
+        return paramQuery;
+    }
+
+    private static String bindPkCols(SourceTargetColumnNames columnNames, long 
scrutinyTimeMillis,
+            String paramQuery) {
+        paramQuery =
+                paramQuery.replaceFirst("\\?",
+                    "'" + columnNames.getQualifiedSourceTableName() + "'");
+        paramQuery =
+                paramQuery.replaceFirst("\\?",
+                    "'" + columnNames.getQualifiedTargetTableName() + "'");
+        paramQuery = paramQuery.replaceFirst("\\?", scrutinyTimeMillis + "");
+        return paramQuery;
+    }
+
+    private static String getHasTargetRowQuery(Connection conn, 
SourceTargetColumnNames columnNames,
+            long scrutinyTimeMillis) throws SQLException {
+        String whereQuery =
+                constructOutputTableQuery(conn, columnNames,
+                    getPksCsv() + ", " + 
SchemaUtil.getEscapedFullColumnName("HAS_TARGET_ROW"));
+        String inClause =
+                " IN " + 
QueryUtil.constructParameterizedInClause(getPkCols().size() + 1, 1);
+        String paramQuery = whereQuery + inClause;
+        paramQuery = bindPkCols(columnNames, scrutinyTimeMillis, paramQuery);
+        return paramQuery;
+    }
+
+    private static String getPksCsv() {
+        String pkColsCsv = 
Joiner.on(",").join(SchemaUtil.getEscapedFullColumnNames(getPkCols()));
+        return pkColsCsv;
+    }
+
+    private static List<String> getPkCols() {
+        return Arrays.asList(SOURCE_TABLE_COL_NAME, TARGET_TABLE_COL_NAME,
+            SCRUTINY_EXECUTE_TIME_COL_NAME);
+    }
+
+    private static String constructOutputTableQuery(Connection connection,
+            SourceTargetColumnNames columnNames, String conditions) throws 
SQLException {
+        PTable pOutputTable = PhoenixRuntime.getTable(connection, 
OUTPUT_TABLE_NAME);
+        List<String> outputTableColumns = 
SchemaUtil.getColumnNames(pOutputTable.getColumns());
+        List<String> selectCols =
+                Lists.newArrayList(
+                    Iterables.concat(outputTableColumns, 
columnNames.getUnqualifiedSourceColNames(),
+                        columnNames.getUnqualifiedTargetColNames()));
+        String dynamicCols =
+                
Joiner.on(",").join(Iterables.concat(columnNames.getSourceDynamicCols(),
+                    columnNames.getTargetDynamicCols()));
+        // dynamic defined after the table name
+        // https://phoenix.apache.org/dynamic_columns.html
+        String dynamicTableName = OUTPUT_TABLE_NAME + "(" + dynamicCols + ")";
+        return QueryUtil.constructSelectStatement(dynamicTableName, 
selectCols, conditions, null, true);
+    }
+
+    private static List<String> getOutputTableColumns(Connection connection) 
throws SQLException {
+        PTable pOutputTable =
+                PhoenixRuntime.getTable(connection, 
IndexScrutinyTableOutput.OUTPUT_TABLE_NAME);
+        List<String> outputTableColumns = 
SchemaUtil.getColumnNames(pOutputTable.getColumns());
+        return outputTableColumns;
+    }
+
+}

Reply via email to