Github user akshita-malhotra commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/309#discussion_r203943433
--- Diff:
phoenix-core/src/it/java/org/apache/phoenix/mapreduce/VerifyReplicationToolIT.java
---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.*;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class VerifyReplicationToolIT extends BaseUniqueNamesOwnClusterIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(VerifyReplicationToolIT.class);
+ private static final String CREATE_USER_TABLE = "CREATE TABLE IF NOT
EXISTS %s ( " +
+ " TENANT_ID VARCHAR NOT NULL, USER_ID VARCHAR NOT NULL, AGE
INTEGER " +
+ " CONSTRAINT pk PRIMARY KEY ( TENANT_ID, USER_ID ))";
+ private static final String UPSERT_USER = "UPSERT INTO %s VALUES (?,
?, ?)";
+ private static final String UPSERT_SELECT_USERS =
+ "UPSERT INTO %s SELECT TENANT_ID, USER_ID, %d FROM %s WHERE
TENANT_ID = ? LIMIT %d";
+ private static final Random RANDOM = new Random();
+
+ private static int tenantNum = 0;
+ private static int userNum = 0;
+ private static String sourceTableName;
+ private static String targetTableName;
+ private List<String> sourceTenants;
+ private String sourceOnlyTenant;
+ private String sourceAndTargetTenant;
+ private String targetOnlyTenant;
+
+ @BeforeClass
+ public static void createTables() throws Exception {
+ NUM_SLAVES_BASE = 2;
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ Connection conn = DriverManager.getConnection(getUrl());
+ sourceTableName = generateUniqueName();
+ targetTableName = generateUniqueName();
+ // tables will have the same schema, but a different number of
regions
+ conn.createStatement().execute(String.format(CREATE_USER_TABLE,
sourceTableName));
+ conn.createStatement().execute(String.format(CREATE_USER_TABLE,
targetTableName));
+ conn.commit();
+ }
+
+ @Before
+ public void setupTenants() throws Exception {
+ sourceTenants = new ArrayList<>(2);
+ sourceTenants.add("tenant" + tenantNum++);
+ sourceTenants.add("tenant" + tenantNum++);
+ sourceOnlyTenant = sourceTenants.get(0);
+ sourceAndTargetTenant = sourceTenants.get(1);
+ targetOnlyTenant = "tenant" + tenantNum++;
+ upsertData();
+ split(sourceTableName, 4);
+ split(targetTableName, 2);
+ // ensure scans for each table touch multiple region servers
+ ensureRegionsOnDifferentServers(sourceTableName);
+ ensureRegionsOnDifferentServers(targetTableName);
+ }
+
+ @Test
+ public void testVerifyRowsMatch() throws Exception {
+ verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant),
10, 0, 0, 0, 0);
+ }
+
+ @Test
+ public void testVerifySourceOnly() throws Exception {
+ verify(String.format("TENANT_ID = '%s'", sourceOnlyTenant), 0, 10,
10, 0, 0);
+ }
+
+ @Test
+ public void testVerifyTargetOnly() throws Exception {
+ verify(String.format("TENANT_ID = '%s'", targetOnlyTenant), 0, 10,
0, 10, 0);
+ }
+
+ @Test
+ public void testVerifyRowsDifferent() throws Exception {
+ // change three rows on the source table so they no longer match
on the target
+ upsertSelectData(sourceTableName, sourceAndTargetTenant, -1, 3);
+ verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant),
7, 3, 0, 0, 3);
+ }
+
+ @Test
+ public void testVerifyRowsCountMismatch() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl());
+ // delete one common row from the target
+ PreparedStatement deleteTargetStmt =
+ conn.prepareStatement(String.format("DELETE FROM %s WHERE
TENANT_ID = '%s' LIMIT 1", targetTableName, sourceAndTargetTenant));
+ deleteTargetStmt.execute();
+ conn.commit();
+ assertEquals(9, countUsers(targetTableName,
sourceAndTargetTenant));
+ verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant),
9, 1, 1, 0, 0);
+
+ }
+
+ private void verify(String sqlConditions, int good, int bad, int
onlyInSource, int onlyInTarget,
+ int contentDifferent) throws Exception {
+ VerifyReplicationTool.Builder builder =
+
VerifyReplicationTool.newBuilder(getUtility().getConfiguration());
+ builder.tableName(sourceTableName);
+ builder.targetTableName(targetTableName);
+ builder.sqlConditions(sqlConditions);
+ builder.timestamp(EnvironmentEdgeManager.currentTimeMillis());
+ VerifyReplicationTool tool = builder.build();
+ Job job = tool.createSubmittableJob();
+ // use the local runner and cleanup previous output
+ job.getConfiguration().set("mapreduce.framework.name", "local");
+ cleanupPreviousJobOutput(job);
+ Assert.assertTrue("Job should have completed",
job.waitForCompletion(true));
+ Counters counters = job.getCounters();
+ LOG.info(counters.toString());
+ assertEquals(good,
+
counters.findCounter(VerifyReplicationTool.Verifier.Counter.GOODROWS).getValue());
+ assertEquals(bad,
+
counters.findCounter(VerifyReplicationTool.Verifier.Counter.BADROWS).getValue());
+ assertEquals(onlyInSource, counters.findCounter(
+
VerifyReplicationTool.Verifier.Counter.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
+ assertEquals(onlyInTarget, counters.findCounter(
+
VerifyReplicationTool.Verifier.Counter.ONLY_IN_TARGET_TABLE_ROWS).getValue());
+ assertEquals(contentDifferent, counters.findCounter(
+
VerifyReplicationTool.Verifier.Counter.CONTENT_DIFFERENT_ROWS).getValue());
+ }
+
+ private void upsertData() throws SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement sourceStmt =
+ conn.prepareStatement(String.format(UPSERT_USER,
sourceTableName));
+ PreparedStatement targetStmt =
+ conn.prepareStatement(String.format(UPSERT_USER,
targetTableName));
+ // create 2 tenants with 10 users each in source table
+ for (int t = 0; t < 2; t++) {
+ for (int u = 1; u <= 10; u++) {
+ String tenantId = sourceTenants.get(t);
+ String userId = "user" + userNum++;
+ int age = RANDOM.nextInt(99) + 1;
+ upsertData(sourceStmt, tenantId, userId, age);
+ // add matching users for the shared tenant to the target
table
+ if (tenantId.equals(sourceAndTargetTenant)) {
+ upsertData(targetStmt, tenantId, userId, age);
+ }
+ }
+ }
+ // add 1 tenant with 10 users to the target table
+ for (int u = 1; u <= 10; u++) {
+ upsertData(targetStmt, targetOnlyTenant);
+ }
+
+
+ conn.commit();
+ assertEquals(10, countUsers(sourceTableName, sourceOnlyTenant));
+ assertEquals(10, countUsers(sourceTableName,
sourceAndTargetTenant));
+ assertEquals(0, countUsers(sourceTableName, targetOnlyTenant));
+ assertEquals(10, countUsers(targetTableName,
sourceAndTargetTenant));
+ assertEquals(10, countUsers(targetTableName, targetOnlyTenant));
+ assertEquals(0, countUsers(targetTableName, sourceOnlyTenant));
+ }
+
+ private void upsertData(PreparedStatement stmt, String tenantId)
throws SQLException {
+ String userId = "user" + userNum++;
+ int age = RANDOM.nextInt(99) + 1;
+ upsertData(stmt, tenantId, userId, age);
+ }
+
+ private void upsertData(PreparedStatement stmt, String tenantId,
String userId, int age)
+ throws SQLException {
+ int i = 1;
+ stmt.setString(i++, tenantId);
+ stmt.setString(i++, userId);
+ stmt.setInt(i++, age);
+ stmt.execute();
+ }
+
+ private void upsertSelectData(String tableName, String tenantId, int
age, int limit)
+ throws SQLException {
+ String sql = String.format(UPSERT_SELECT_USERS, tableName, age,
tableName, limit);
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement ps = conn.prepareStatement(sql);
+ ps.setString(1, tenantId);
+ ps.execute();
+ conn.commit();
+ assertEquals(10, countUsers(tableName, tenantId));
+ }
+
+ private int countUsers(String tableName, String tenantId) throws
SQLException {
+ Connection conn = DriverManager.getConnection(getUrl());
+ PreparedStatement ps =
+ conn.prepareStatement(
+ String.format("SELECT COUNT(*) FROM %s WHERE
TENANT_ID = ?", tableName));
+ ps.setString(1, tenantId);
+ ResultSet rs = ps.executeQuery();
+ rs.next();
+ return rs.getInt(1);
+ }
+
+ private void cleanupPreviousJobOutput(Job job) throws IOException {
+ Path outputDir =
+ new
Path(job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir"));
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ if (fs.exists(outputDir)) {
+ fs.delete(outputDir, true);
+ }
+ }
+
+ private void split(String tableName, int targetRegions) throws
Exception {
+ TableName table = TableName.valueOf(tableName);
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(),
TEST_PROPERTIES).getAdmin();
+ MiniHBaseCluster cluster = getUtility().getHBaseCluster();
+ HMaster master = cluster.getMaster();
+ List<HRegionInfo> regions = admin.getTableRegions(table);
+ int numRegions = regions.size();
+ // split the last region in the table until we have the target
number of regions
+ while (numRegions < targetRegions) {
+ HRegionInfo region = regions.get(numRegions - 1);
+ ServerName serverName =
+ master.getAssignmentManager().getRegionStates()
+ .getRegionServerOfRegion(region);
+ byte[] splitPoint =
--- End diff --
@karanmehta93 : This is part of the code which @aaraujo originally wrote. I
didn't get a chance to dive deeper into it yet, I am still adding more test
cases. You should see the functionality change in the next update. Thanks!
---