Github user karanmehta93 commented on a diff in the pull request: https://github.com/apache/phoenix/pull/309#discussion_r203933404 --- Diff: phoenix-core/src/it/java/org/apache/phoenix/mapreduce/VerifyReplicationToolIT.java --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.*; +import java.util.*; + +import com.google.common.collect.Maps; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class VerifyReplicationToolIT extends BaseUniqueNamesOwnClusterIT { + private static final Logger LOG = LoggerFactory.getLogger(VerifyReplicationToolIT.class); + private static final String CREATE_USER_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + + " TENANT_ID VARCHAR NOT NULL, USER_ID VARCHAR NOT NULL, AGE INTEGER " + + " CONSTRAINT pk PRIMARY KEY ( TENANT_ID, USER_ID ))"; + private static final String UPSERT_USER = "UPSERT INTO %s VALUES (?, ?, ?)"; + private static final String UPSERT_SELECT_USERS = + "UPSERT INTO %s SELECT TENANT_ID, USER_ID, %d FROM %s WHERE TENANT_ID = ? LIMIT %d"; + private static final Random RANDOM = new Random(); + + private static int tenantNum = 0; + private static int userNum = 0; + private static String sourceTableName; + private static String targetTableName; + private List<String> sourceTenants; + private String sourceOnlyTenant; + private String sourceAndTargetTenant; + private String targetOnlyTenant; + + @BeforeClass + public static void createTables() throws Exception { + NUM_SLAVES_BASE = 2; + Map<String,String> props = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + Connection conn = DriverManager.getConnection(getUrl()); + sourceTableName = generateUniqueName(); + targetTableName = generateUniqueName(); + // tables will have the same schema, but a different number of regions + conn.createStatement().execute(String.format(CREATE_USER_TABLE, sourceTableName)); + conn.createStatement().execute(String.format(CREATE_USER_TABLE, targetTableName)); + conn.commit(); + } + + @Before + public void setupTenants() throws Exception { + sourceTenants = new ArrayList<>(2); + sourceTenants.add("tenant" + tenantNum++); + sourceTenants.add("tenant" + tenantNum++); + sourceOnlyTenant = sourceTenants.get(0); + sourceAndTargetTenant = sourceTenants.get(1); + targetOnlyTenant = "tenant" + tenantNum++; + upsertData(); + split(sourceTableName, 4); + split(targetTableName, 2); + // ensure scans for each table touch multiple region servers + ensureRegionsOnDifferentServers(sourceTableName); + ensureRegionsOnDifferentServers(targetTableName); + } + + @Test + public void testVerifyRowsMatch() throws Exception { + verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 10, 0, 0, 0, 0); + } + + @Test + public void testVerifySourceOnly() throws Exception { + verify(String.format("TENANT_ID = '%s'", sourceOnlyTenant), 0, 10, 10, 0, 0); + } + + @Test + public void testVerifyTargetOnly() throws Exception { + verify(String.format("TENANT_ID = '%s'", targetOnlyTenant), 0, 10, 0, 10, 0); + } + + @Test + public void testVerifyRowsDifferent() throws Exception { + // change three rows on the source table so they no longer match on the target + upsertSelectData(sourceTableName, sourceAndTargetTenant, -1, 3); + verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 7, 3, 0, 0, 3); + } + + @Test + public void testVerifyRowsCountMismatch() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + // delete one common row from the target + PreparedStatement deleteTargetStmt = + conn.prepareStatement(String.format("DELETE FROM %s WHERE TENANT_ID = '%s' LIMIT 1", targetTableName, sourceAndTargetTenant)); + deleteTargetStmt.execute(); + conn.commit(); + assertEquals(9, countUsers(targetTableName, sourceAndTargetTenant)); + verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 9, 1, 1, 0, 0); + + } + + private void verify(String sqlConditions, int good, int bad, int onlyInSource, int onlyInTarget, + int contentDifferent) throws Exception { + VerifyReplicationTool.Builder builder = + VerifyReplicationTool.newBuilder(getUtility().getConfiguration()); + builder.tableName(sourceTableName); + builder.targetTableName(targetTableName); + builder.sqlConditions(sqlConditions); + builder.timestamp(EnvironmentEdgeManager.currentTimeMillis()); + VerifyReplicationTool tool = builder.build(); + Job job = tool.createSubmittableJob(); + // use the local runner and cleanup previous output + job.getConfiguration().set("mapreduce.framework.name", "local"); + cleanupPreviousJobOutput(job); + Assert.assertTrue("Job should have completed", job.waitForCompletion(true)); + Counters counters = job.getCounters(); + LOG.info(counters.toString()); + assertEquals(good, + counters.findCounter(VerifyReplicationTool.Verifier.Counter.GOODROWS).getValue()); + assertEquals(bad, + counters.findCounter(VerifyReplicationTool.Verifier.Counter.BADROWS).getValue()); + assertEquals(onlyInSource, counters.findCounter( + VerifyReplicationTool.Verifier.Counter.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); + assertEquals(onlyInTarget, counters.findCounter( + VerifyReplicationTool.Verifier.Counter.ONLY_IN_TARGET_TABLE_ROWS).getValue()); + assertEquals(contentDifferent, counters.findCounter( + VerifyReplicationTool.Verifier.Counter.CONTENT_DIFFERENT_ROWS).getValue()); + } + + private void upsertData() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement sourceStmt = + conn.prepareStatement(String.format(UPSERT_USER, sourceTableName)); + PreparedStatement targetStmt = + conn.prepareStatement(String.format(UPSERT_USER, targetTableName)); + // create 2 tenants with 10 users each in source table + for (int t = 0; t < 2; t++) { + for (int u = 1; u <= 10; u++) { + String tenantId = sourceTenants.get(t); + String userId = "user" + userNum++; + int age = RANDOM.nextInt(99) + 1; + upsertData(sourceStmt, tenantId, userId, age); + // add matching users for the shared tenant to the target table + if (tenantId.equals(sourceAndTargetTenant)) { + upsertData(targetStmt, tenantId, userId, age); + } + } + } + // add 1 tenant with 10 users to the target table + for (int u = 1; u <= 10; u++) { + upsertData(targetStmt, targetOnlyTenant); + } + + + conn.commit(); + assertEquals(10, countUsers(sourceTableName, sourceOnlyTenant)); + assertEquals(10, countUsers(sourceTableName, sourceAndTargetTenant)); + assertEquals(0, countUsers(sourceTableName, targetOnlyTenant)); + assertEquals(10, countUsers(targetTableName, sourceAndTargetTenant)); + assertEquals(10, countUsers(targetTableName, targetOnlyTenant)); + assertEquals(0, countUsers(targetTableName, sourceOnlyTenant)); + } + + private void upsertData(PreparedStatement stmt, String tenantId) throws SQLException { + String userId = "user" + userNum++; + int age = RANDOM.nextInt(99) + 1; + upsertData(stmt, tenantId, userId, age); + } + + private void upsertData(PreparedStatement stmt, String tenantId, String userId, int age) + throws SQLException { + int i = 1; + stmt.setString(i++, tenantId); + stmt.setString(i++, userId); + stmt.setInt(i++, age); + stmt.execute(); + } + + private void upsertSelectData(String tableName, String tenantId, int age, int limit) + throws SQLException { + String sql = String.format(UPSERT_SELECT_USERS, tableName, age, tableName, limit); + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement ps = conn.prepareStatement(sql); + ps.setString(1, tenantId); + ps.execute(); + conn.commit(); + assertEquals(10, countUsers(tableName, tenantId)); + } + + private int countUsers(String tableName, String tenantId) throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement ps = + conn.prepareStatement( + String.format("SELECT COUNT(*) FROM %s WHERE TENANT_ID = ?", tableName)); + ps.setString(1, tenantId); + ResultSet rs = ps.executeQuery(); + rs.next(); + return rs.getInt(1); + } + + private void cleanupPreviousJobOutput(Job job) throws IOException { + Path outputDir = + new Path(job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir")); + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + if (fs.exists(outputDir)) { + fs.delete(outputDir, true); + } + } + + private void split(String tableName, int targetRegions) throws Exception { + TableName table = TableName.valueOf(tableName); + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TEST_PROPERTIES).getAdmin(); + MiniHBaseCluster cluster = getUtility().getHBaseCluster(); + HMaster master = cluster.getMaster(); + List<HRegionInfo> regions = admin.getTableRegions(table); + int numRegions = regions.size(); + // split the last region in the table until we have the target number of regions + while (numRegions < targetRegions) { + HRegionInfo region = regions.get(numRegions - 1); + ServerName serverName = + master.getAssignmentManager().getRegionStates() + .getRegionServerOfRegion(region); + byte[] splitPoint = --- End diff -- I am not sure if this is working as intended. You want the data to be on different regions, right, so that multiple map tasks can be spawned? The first region has start key `''` and the split will give it the end key as `\x0` and the same will go on for all the splits. Hence all of your data will be in single region only. When I put a break point in `PhoenixInputFormat#getSplits()`, the query plan just returns 1 split for the tenant.
---