[ https://issues.apache.org/jira/browse/PHOENIX-3817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550190#comment-16550190 ]
ASF GitHub Bot commented on PHOENIX-3817: ----------------------------------------- Github user karanmehta93 commented on a diff in the pull request: https://github.com/apache/phoenix/pull/309#discussion_r203932680 --- Diff: phoenix-core/src/it/java/org/apache/phoenix/mapreduce/VerifyReplicationToolIT.java --- @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.*; +import java.util.*; + +import com.google.common.collect.Maps; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +public class VerifyReplicationToolIT extends BaseUniqueNamesOwnClusterIT { + private static final Logger LOG = LoggerFactory.getLogger(VerifyReplicationToolIT.class); + private static final String CREATE_USER_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + + " TENANT_ID VARCHAR NOT NULL, USER_ID VARCHAR NOT NULL, AGE INTEGER " + + " CONSTRAINT pk PRIMARY KEY ( TENANT_ID, USER_ID ))"; + private static final String UPSERT_USER = "UPSERT INTO %s VALUES (?, ?, ?)"; + private static final String UPSERT_SELECT_USERS = + "UPSERT INTO %s SELECT TENANT_ID, USER_ID, %d FROM %s WHERE TENANT_ID = ? LIMIT %d"; + private static final Random RANDOM = new Random(); + + private static int tenantNum = 0; + private static int userNum = 0; + private static String sourceTableName; + private static String targetTableName; + private List<String> sourceTenants; + private String sourceOnlyTenant; + private String sourceAndTargetTenant; + private String targetOnlyTenant; + + @BeforeClass + public static void createTables() throws Exception { + NUM_SLAVES_BASE = 2; + Map<String,String> props = Maps.newHashMapWithExpectedSize(1); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + Connection conn = DriverManager.getConnection(getUrl()); + sourceTableName = generateUniqueName(); + targetTableName = generateUniqueName(); + // tables will have the same schema, but a different number of regions + conn.createStatement().execute(String.format(CREATE_USER_TABLE, sourceTableName)); + conn.createStatement().execute(String.format(CREATE_USER_TABLE, targetTableName)); + conn.commit(); + } + + @Before + public void setupTenants() throws Exception { + sourceTenants = new ArrayList<>(2); + sourceTenants.add("tenant" + tenantNum++); + sourceTenants.add("tenant" + tenantNum++); + sourceOnlyTenant = sourceTenants.get(0); + sourceAndTargetTenant = sourceTenants.get(1); + targetOnlyTenant = "tenant" + tenantNum++; + upsertData(); + split(sourceTableName, 4); + split(targetTableName, 2); + // ensure scans for each table touch multiple region servers + ensureRegionsOnDifferentServers(sourceTableName); + ensureRegionsOnDifferentServers(targetTableName); + } + + @Test + public void testVerifyRowsMatch() throws Exception { + verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 10, 0, 0, 0, 0); + } + + @Test + public void testVerifySourceOnly() throws Exception { + verify(String.format("TENANT_ID = '%s'", sourceOnlyTenant), 0, 10, 10, 0, 0); + } + + @Test + public void testVerifyTargetOnly() throws Exception { + verify(String.format("TENANT_ID = '%s'", targetOnlyTenant), 0, 10, 0, 10, 0); + } + + @Test + public void testVerifyRowsDifferent() throws Exception { + // change three rows on the source table so they no longer match on the target + upsertSelectData(sourceTableName, sourceAndTargetTenant, -1, 3); + verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 7, 3, 0, 0, 3); + } + + @Test + public void testVerifyRowsCountMismatch() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + // delete one common row from the target + PreparedStatement deleteTargetStmt = + conn.prepareStatement(String.format("DELETE FROM %s WHERE TENANT_ID = '%s' LIMIT 1", targetTableName, sourceAndTargetTenant)); + deleteTargetStmt.execute(); + conn.commit(); + assertEquals(9, countUsers(targetTableName, sourceAndTargetTenant)); + verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 9, 1, 1, 0, 0); + + } + + private void verify(String sqlConditions, int good, int bad, int onlyInSource, int onlyInTarget, + int contentDifferent) throws Exception { + VerifyReplicationTool.Builder builder = + VerifyReplicationTool.newBuilder(getUtility().getConfiguration()); + builder.tableName(sourceTableName); + builder.targetTableName(targetTableName); + builder.sqlConditions(sqlConditions); + builder.timestamp(EnvironmentEdgeManager.currentTimeMillis()); + VerifyReplicationTool tool = builder.build(); + Job job = tool.createSubmittableJob(); + // use the local runner and cleanup previous output + job.getConfiguration().set("mapreduce.framework.name", "local"); + cleanupPreviousJobOutput(job); + Assert.assertTrue("Job should have completed", job.waitForCompletion(true)); + Counters counters = job.getCounters(); + LOG.info(counters.toString()); + assertEquals(good, + counters.findCounter(VerifyReplicationTool.Verifier.Counter.GOODROWS).getValue()); + assertEquals(bad, + counters.findCounter(VerifyReplicationTool.Verifier.Counter.BADROWS).getValue()); + assertEquals(onlyInSource, counters.findCounter( + VerifyReplicationTool.Verifier.Counter.ONLY_IN_SOURCE_TABLE_ROWS).getValue()); + assertEquals(onlyInTarget, counters.findCounter( + VerifyReplicationTool.Verifier.Counter.ONLY_IN_TARGET_TABLE_ROWS).getValue()); + assertEquals(contentDifferent, counters.findCounter( + VerifyReplicationTool.Verifier.Counter.CONTENT_DIFFERENT_ROWS).getValue()); + } + + private void upsertData() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement sourceStmt = + conn.prepareStatement(String.format(UPSERT_USER, sourceTableName)); + PreparedStatement targetStmt = + conn.prepareStatement(String.format(UPSERT_USER, targetTableName)); + // create 2 tenants with 10 users each in source table + for (int t = 0; t < 2; t++) { --- End diff -- nit: you can simplify this by creating 2 separate loops, just like the one you did on Line 183-185. That will be easier to understand. > VerifyReplication using SQL > --------------------------- > > Key: PHOENIX-3817 > URL: https://issues.apache.org/jira/browse/PHOENIX-3817 > Project: Phoenix > Issue Type: Improvement > Reporter: Alex Araujo > Assignee: Akshita Malhotra > Priority: Minor > Fix For: 4.15.0 > > Attachments: PHOENIX-3817.v1.patch, PHOENIX-3817.v2.patch, > PHOENIX-3817.v3.patch, PHOENIX-3817.v4.patch, PHOENIX-3817.v5.patch, > PHOENIX-3817.v6.patch > > > Certain use cases may copy or replicate a subset of a table to a different > table or cluster. For example, application topologies may map data for > specific tenants to different peer clusters. > It would be useful to have a Phoenix VerifyReplication tool that accepts an > SQL query, a target table, and an optional target cluster. The tool would > compare data returned by the query on the different tables and update various > result counters (similar to HBase's VerifyReplication). -- This message was sent by Atlassian JIRA (v7.6.3#76005)