Github user akshita-malhotra commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/309#discussion_r204181390
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/VerifyReplicationTool.java
---
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * Map only job that compares data across a source and target table. The
target table can be on the
+ * same cluster or on a remote cluster. SQL conditions may be specified to
compare only a subset of
+ * both tables.
+ */
+public class VerifyReplicationTool implements Tool {
+ private static final Logger LOG =
LoggerFactory.getLogger(VerifyReplicationTool.class);
+
+ static final Option
+ ZK_QUORUM_OPT =
+ new Option("z", "zookeeper", true, "ZooKeeper connection
details (optional)");
+ static final Option
+ TABLE_NAME_OPT =
+ new Option("t", "table", true, "Phoenix table name
(required)");
+ static final Option
+ TARGET_TABLE_NAME_OPT =
+ new Option("tt", "target-table", true, "Target Phoenix table
name (optional)");
+ static final Option
+ TARGET_ZK_QUORUM_OPT =
+ new Option("tz", "target-zookeeper", true,
+ "Target ZooKeeper connection details (optional)");
+ static final Option
+ CONDITIONS_OPT =
+ new Option("c", "conditions", true,
+ "Conditions for select query WHERE clause (optional)");
+ static final Option TIMESTAMP =
+ new Option("ts", "timestamp", true,
+ "Timestamp in millis used to compare the two tables.
Defaults to current time minus 60 seconds");
+
+ static final Option HELP_OPT = new Option("h", "help", false, "Show
this help and quit");
+
+ private Configuration conf;
+
+ private String zkQuorum;
+ private String tableName;
+ private String targetTableName;
+ private String targetZkQuorum;
+ private String sqlConditions;
+ private long timestamp;
+
+ VerifyReplicationTool(Configuration conf) {
+ this.conf = Preconditions.checkNotNull(conf, "Configuration cannot
be null");
+ }
+
+ public static Builder newBuilder(Configuration conf) {
+ return new Builder(conf);
+ }
+
+ public static class Verifier
+ extends Mapper<NullWritable, VerifyReplicationSourceWritable,
NullWritable, NullWritable> {
+
+ private QueryPlan targetQueryPlan;
+ private PhoenixResultSet targetResultSet = null;
+ private boolean targetHasNext;
+ private boolean sourceHasData;
+
+ public enum Counter {
+ GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS,
ONLY_IN_TARGET_TABLE_ROWS,
+ CONTENT_DIFFERENT_ROWS
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException,
InterruptedException {
+ super.setup(context);
+ try {
+ targetQueryPlan =
PhoenixMapReduceUtil.getQueryPlan(context.getConfiguration(), true);
+ } catch (SQLException e) {
+ throw new IOException(e.getMessage());
+ }
+
+ }
+
+ @Override
+ protected void map(NullWritable key,
VerifyReplicationSourceWritable value, Context context)
+ throws IOException, InterruptedException {
+
+ sourceHasData = true;
+ try {
+ if(targetResultSet == null) {
+ Configuration conf = context.getConfiguration();
+ byte[] targetStartRow = null;
+ byte[] targetStopRow = null;
+ // find source table split
+ PhoenixInputSplit sourceInputSplit =
(PhoenixInputSplit) context.getInputSplit();
+ if(key != null) {
+ targetStartRow = value.getSourceKey().get();
+ }
+ if(sourceInputSplit.getLength() != 0) {
+ targetStopRow =
sourceInputSplit.getKeyRange().getUpperRange();
+ }
+ getTargetResultSet(conf, targetStartRow,
targetStopRow);
+ }
+
+ while(true) {
+ if(!targetHasNext) {
+ logFailRowAndIncrementCounter(context,
Counter.ONLY_IN_SOURCE_TABLE_ROWS, value.getSourceKey());
+ break;
+ }
+ ImmutableBytesWritable targetKey = null;
+ if (targetResultSet.getCurrentRow() != null) {
+ targetKey = new ImmutableBytesWritable();
+ targetResultSet.getCurrentRow().getKey(targetKey);
+ }
+
+ int keyCompare;
+ if(key == null){
+ keyCompare = 1;
+ } else {
+ keyCompare =
Bytes.compareTo(value.getSourceKey().get(), targetKey.get());
+ }
+ if(keyCompare == 0) {
+ Map<String, Object> sourceValues =
value.getSourceResults();
+ Map<String, Object> targetValues = null;
+ PhoenixRecordWritable targetResults = new
PhoenixRecordWritable();
+ targetResults.readFields(targetResultSet);
+ if (targetResults.getResultMap() != null) {
+ targetValues = targetResults.getResultMap();
+ }
+ boolean valuesMatch = true;
+ if (sourceValues == null) {
+ if (targetValues != null) {
+ valuesMatch = false;
+ }
+ } else if (!sourceValues.equals(targetValues)) {
+ valuesMatch = false;
+ }
+ if (!valuesMatch) {
+ logFailRowAndIncrementCounter(context,
Counter.CONTENT_DIFFERENT_ROWS, value.getSourceKey());
+ } else {
+
context.getCounter(Counter.GOODROWS).increment(1);
+ }
+ targetHasNext = targetResultSet.next();
+ break;
+ } else if (keyCompare < 0) {
+ // row only exists in source table
+ logFailRowAndIncrementCounter(context,
Counter.ONLY_IN_SOURCE_TABLE_ROWS, value.getSourceKey());
+ break;
+ } else {
+ // row only exists in target table
+ logFailRowAndIncrementCounter(context,
Counter.ONLY_IN_TARGET_TABLE_ROWS,
+ targetKey);
+ targetHasNext = targetResultSet.next();
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException,
InterruptedException {
+ super.cleanup(context);
+ try {
+ if (!sourceHasData && targetResultSet == null) {
+ getTargetResultSet(context.getConfiguration(), null,
null);
+ }
+ if(targetResultSet != null) {
+ while (targetHasNext) {
+ ImmutableBytesWritable targetKey = null;
+
+ if (targetResultSet.getCurrentRow() != null) {
+ targetKey = new ImmutableBytesWritable();
+
targetResultSet.getCurrentRow().getKey(targetKey);
--- End diff --
@twdsilva That's correct.
---