Github user karanmehta93 commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/309#discussion_r203933404
  
    --- Diff: 
phoenix-core/src/it/java/org/apache/phoenix/mapreduce/VerifyReplicationToolIT.java
 ---
    @@ -0,0 +1,323 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.phoenix.mapreduce;
    +
    +import java.io.IOException;
    +import java.sql.*;
    +import java.util.*;
    +
    +import com.google.common.collect.Maps;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.MiniHBaseCluster;
    +import org.apache.hadoop.hbase.ServerName;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.HBaseAdmin;
    +import org.apache.hadoop.hbase.master.HMaster;
    +import org.apache.hadoop.hbase.regionserver.HRegionServer;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.hadoop.mapreduce.Counters;
    +import org.apache.hadoop.mapreduce.Job;
    +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
    +import org.apache.phoenix.util.EnvironmentEdgeManager;
    +import org.apache.phoenix.util.ReadOnlyProps;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotEquals;
    +
    +public class VerifyReplicationToolIT extends BaseUniqueNamesOwnClusterIT {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(VerifyReplicationToolIT.class);
    +    private static final String CREATE_USER_TABLE = "CREATE TABLE IF NOT 
EXISTS %s ( " +
    +            " TENANT_ID VARCHAR NOT NULL, USER_ID VARCHAR NOT NULL, AGE 
INTEGER " +
    +            " CONSTRAINT pk PRIMARY KEY ( TENANT_ID, USER_ID ))";
    +    private static final String UPSERT_USER = "UPSERT INTO %s VALUES (?, 
?, ?)";
    +    private static final String UPSERT_SELECT_USERS =
    +            "UPSERT INTO %s SELECT TENANT_ID, USER_ID, %d FROM %s WHERE 
TENANT_ID = ? LIMIT %d";
    +    private static final Random RANDOM = new Random();
    +
    +    private static int tenantNum = 0;
    +    private static int userNum = 0;
    +    private static String sourceTableName;
    +    private static String targetTableName;
    +    private List<String> sourceTenants;
    +    private String sourceOnlyTenant;
    +    private String sourceAndTargetTenant;
    +    private String targetOnlyTenant;
    +
    +    @BeforeClass
    +    public static void createTables() throws Exception {
    +        NUM_SLAVES_BASE = 2;
    +        Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
    +        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
    +        Connection conn = DriverManager.getConnection(getUrl());
    +        sourceTableName = generateUniqueName();
    +        targetTableName = generateUniqueName();
    +        // tables will have the same schema, but a different number of 
regions
    +        conn.createStatement().execute(String.format(CREATE_USER_TABLE, 
sourceTableName));
    +        conn.createStatement().execute(String.format(CREATE_USER_TABLE, 
targetTableName));
    +        conn.commit();
    +    }
    +
    +    @Before
    +    public void setupTenants() throws Exception {
    +        sourceTenants = new ArrayList<>(2);
    +        sourceTenants.add("tenant" + tenantNum++);
    +        sourceTenants.add("tenant" + tenantNum++);
    +        sourceOnlyTenant = sourceTenants.get(0);
    +        sourceAndTargetTenant = sourceTenants.get(1);
    +        targetOnlyTenant = "tenant" + tenantNum++;
    +        upsertData();
    +        split(sourceTableName, 4);
    +        split(targetTableName, 2);
    +        // ensure scans for each table touch multiple region servers
    +        ensureRegionsOnDifferentServers(sourceTableName);
    +        ensureRegionsOnDifferentServers(targetTableName);
    +    }
    +
    +    @Test
    +    public void testVerifyRowsMatch() throws Exception {
    +        verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 
10, 0, 0, 0, 0);
    +    }
    +
    +    @Test
    +    public void testVerifySourceOnly() throws Exception {
    +        verify(String.format("TENANT_ID = '%s'", sourceOnlyTenant), 0, 10, 
10, 0, 0);
    +    }
    +
    +    @Test
    +    public void testVerifyTargetOnly() throws Exception {
    +        verify(String.format("TENANT_ID = '%s'", targetOnlyTenant), 0, 10, 
0, 10, 0);
    +    }
    +
    +    @Test
    +    public void testVerifyRowsDifferent() throws Exception {
    +        // change three rows on the source table so they no longer match 
on the target
    +        upsertSelectData(sourceTableName, sourceAndTargetTenant, -1, 3);
    +        verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 
7, 3, 0, 0, 3);
    +    }
    +
    +    @Test
    +    public void testVerifyRowsCountMismatch() throws Exception {
    +        Connection conn = DriverManager.getConnection(getUrl());
    +        // delete one common row from the target
    +        PreparedStatement deleteTargetStmt =
    +            conn.prepareStatement(String.format("DELETE FROM %s WHERE 
TENANT_ID = '%s' LIMIT 1", targetTableName, sourceAndTargetTenant));
    +        deleteTargetStmt.execute();
    +        conn.commit();
    +        assertEquals(9, countUsers(targetTableName, 
sourceAndTargetTenant));
    +        verify(String.format("TENANT_ID = '%s'", sourceAndTargetTenant), 
9, 1, 1, 0, 0);
    +
    +    }
    +
    +    private void verify(String sqlConditions, int good, int bad, int 
onlyInSource, int onlyInTarget,
    +            int contentDifferent) throws Exception {
    +        VerifyReplicationTool.Builder builder =
    +                
VerifyReplicationTool.newBuilder(getUtility().getConfiguration());
    +        builder.tableName(sourceTableName);
    +        builder.targetTableName(targetTableName);
    +        builder.sqlConditions(sqlConditions);
    +        builder.timestamp(EnvironmentEdgeManager.currentTimeMillis());
    +        VerifyReplicationTool tool = builder.build();
    +        Job job = tool.createSubmittableJob();
    +        // use the local runner and cleanup previous output
    +        job.getConfiguration().set("mapreduce.framework.name", "local");
    +        cleanupPreviousJobOutput(job);
    +        Assert.assertTrue("Job should have completed", 
job.waitForCompletion(true));
    +        Counters counters = job.getCounters();
    +        LOG.info(counters.toString());
    +        assertEquals(good,
    +                
counters.findCounter(VerifyReplicationTool.Verifier.Counter.GOODROWS).getValue());
    +        assertEquals(bad,
    +                
counters.findCounter(VerifyReplicationTool.Verifier.Counter.BADROWS).getValue());
    +        assertEquals(onlyInSource, counters.findCounter(
    +                
VerifyReplicationTool.Verifier.Counter.ONLY_IN_SOURCE_TABLE_ROWS).getValue());
    +        assertEquals(onlyInTarget, counters.findCounter(
    +                
VerifyReplicationTool.Verifier.Counter.ONLY_IN_TARGET_TABLE_ROWS).getValue());
    +        assertEquals(contentDifferent, counters.findCounter(
    +                
VerifyReplicationTool.Verifier.Counter.CONTENT_DIFFERENT_ROWS).getValue());
    +    }
    +
    +    private void upsertData() throws SQLException {
    +        Connection conn = DriverManager.getConnection(getUrl());
    +        PreparedStatement sourceStmt =
    +                conn.prepareStatement(String.format(UPSERT_USER, 
sourceTableName));
    +        PreparedStatement targetStmt =
    +                conn.prepareStatement(String.format(UPSERT_USER, 
targetTableName));
    +        // create 2 tenants with 10 users each in source table
    +        for (int t = 0; t < 2; t++) {
    +            for (int u = 1; u <= 10; u++) {
    +                String tenantId = sourceTenants.get(t);
    +                String userId = "user" + userNum++;
    +                int age = RANDOM.nextInt(99) + 1;
    +                upsertData(sourceStmt, tenantId, userId, age);
    +                // add matching users for the shared tenant to the target 
table
    +                if (tenantId.equals(sourceAndTargetTenant)) {
    +                    upsertData(targetStmt, tenantId, userId, age);
    +                }
    +            }
    +        }
    +        // add 1 tenant with 10 users to the target table
    +        for (int u = 1; u <= 10; u++) {
    +            upsertData(targetStmt, targetOnlyTenant);
    +        }
    +
    +
    +        conn.commit();
    +        assertEquals(10, countUsers(sourceTableName, sourceOnlyTenant));
    +        assertEquals(10, countUsers(sourceTableName, 
sourceAndTargetTenant));
    +        assertEquals(0, countUsers(sourceTableName, targetOnlyTenant));
    +        assertEquals(10, countUsers(targetTableName, 
sourceAndTargetTenant));
    +        assertEquals(10, countUsers(targetTableName, targetOnlyTenant));
    +        assertEquals(0, countUsers(targetTableName, sourceOnlyTenant));
    +    }
    +
    +    private void upsertData(PreparedStatement stmt, String tenantId) 
throws SQLException {
    +        String userId = "user" + userNum++;
    +        int age = RANDOM.nextInt(99) + 1;
    +        upsertData(stmt, tenantId, userId, age);
    +    }
    +
    +    private void upsertData(PreparedStatement stmt, String tenantId, 
String userId, int age)
    +            throws SQLException {
    +        int i = 1;
    +        stmt.setString(i++, tenantId);
    +        stmt.setString(i++, userId);
    +        stmt.setInt(i++, age);
    +        stmt.execute();
    +    }
    +
    +    private void upsertSelectData(String tableName, String tenantId, int 
age, int limit)
    +            throws SQLException {
    +        String sql = String.format(UPSERT_SELECT_USERS, tableName, age, 
tableName, limit);
    +        Connection conn = DriverManager.getConnection(getUrl());
    +        PreparedStatement ps = conn.prepareStatement(sql);
    +        ps.setString(1, tenantId);
    +        ps.execute();
    +        conn.commit();
    +        assertEquals(10, countUsers(tableName, tenantId));
    +    }
    +
    +    private int countUsers(String tableName, String tenantId) throws 
SQLException {
    +        Connection conn = DriverManager.getConnection(getUrl());
    +        PreparedStatement ps =
    +                conn.prepareStatement(
    +                        String.format("SELECT COUNT(*) FROM %s WHERE 
TENANT_ID = ?", tableName));
    +        ps.setString(1, tenantId);
    +        ResultSet rs = ps.executeQuery();
    +        rs.next();
    +        return rs.getInt(1);
    +    }
    +
    +    private void cleanupPreviousJobOutput(Job job) throws IOException {
    +        Path outputDir =
    +            new 
Path(job.getConfiguration().get("mapreduce.output.fileoutputformat.outputdir"));
    +        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
    +        if (fs.exists(outputDir)) {
    +            fs.delete(outputDir, true);
    +        }
    +    }
    +
    +    private void split(String tableName, int targetRegions) throws 
Exception {
    +        TableName table = TableName.valueOf(tableName);
    +        HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), 
TEST_PROPERTIES).getAdmin();
    +        MiniHBaseCluster cluster = getUtility().getHBaseCluster();
    +        HMaster master = cluster.getMaster();
    +        List<HRegionInfo> regions = admin.getTableRegions(table);
    +        int numRegions = regions.size();
    +        // split the last region in the table until we have the target 
number of regions
    +        while (numRegions < targetRegions) {
    +            HRegionInfo region = regions.get(numRegions - 1);
    +            ServerName serverName =
    +                    master.getAssignmentManager().getRegionStates()
    +                            .getRegionServerOfRegion(region);
    +            byte[] splitPoint =
    --- End diff --
    
    I am not sure if this is working as intended. You want the data to be on 
different regions, right, so that multiple map tasks can be spawned?
    The first region has start key `''` and the split will give it the end key 
as `\x0` and the same will go on for all the splits. Hence all of your data 
will be in single region only. When I put a break point in 
`PhoenixInputFormat#getSplits()`, the query plan just returns 1 split for the 
tenant.


---

Reply via email to