haridsv commented on code in PR #2379:
URL: https://github.com/apache/phoenix/pull/2379#discussion_r3001509296


##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow 
row)
+    throws SQLException {
+
+    // Validate required parameters
+    if (row.getTableName() == null || row.getTableName().isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (row.getType() == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (row.getFromTime() == null || row.getToTime() == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }
+
+    try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+      ps.setString(1, row.getTableName());
+      ps.setString(2, row.getTargetCluster());
+      ps.setString(3, row.getType().name());

Review Comment:
   I would recommend storing a byte code rather a long string to reduce the 
size of the row key.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java:
##########
@@ -223,4 +233,100 @@ public static void setTenantId(final Job job, final 
String tenantId) {
     PhoenixConfigurationUtil.setTenantId(job.getConfiguration(), tenantId);
   }
 
+  /**
+   * Validates that start and end times are in the past and start < end.
+   * @param startTime Start timestamp in millis (nullable, defaults to 0)
+   * @param endTime   End timestamp in millis (nullable, defaults to current 
time)
+   * @param tableName Table name for error messages
+   * @throws IllegalArgumentException if time range is invalid
+   */
+  public static void validateTimeRange(Long startTime, Long endTime, String 
tableName) {
+    long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+    long st = (startTime == null) ? 0L : startTime;
+    long et = (endTime == null) ? currentTime : endTime;
+
+    if (et > currentTime || st >= et) {
+      throw new IllegalArgumentException(String.format(
+        "%s %s: start and end times must be in the past "
+          + "and start < end. Start: %d, End: %d, Current: %d",
+        INVALID_TIME_RANGE_EXCEPTION_MESSAGE, tableName, st, et, currentTime));
+    }
+  }
+
+  /**
+   * Validates that the end time doesn't exceed the max lookback age 
configured in Phoenix.
+   * @param configuration Hadoop configuration
+   * @param endTime       End timestamp in millis
+   * @param tableName     Table name for error messages
+   * @throws IllegalArgumentException if endTime is before min allowed 
timestamp
+   */
+  public static void validateMaxLookbackAge(Configuration configuration, Long 
endTime,
+    String tableName) {
+    long maxLookBackAge = 
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration);
+    if (maxLookBackAge > 0) {
+      long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - 
maxLookBackAge;
+      if (endTime < minTimestamp) {
+        throw new IllegalArgumentException(String.format(
+          "Table %s can't look back past the configured max lookback age: %d 
ms. "
+            + "End time: %d, Min allowed timestamp: %d",
+          tableName, maxLookBackAge, endTime, minTimestamp));
+      }
+    }
+  }
+
+  /**
+   * Validates that a table is suitable for MR operations. Checks table 
existence, type, and state.
+   * @param connection         Phoenix connection
+   * @param qualifiedTableName Qualified table name
+   * @param allowViews         Whether to allow VIEW tables
+   * @param allowIndexes       Whether to allow INDEX tables
+   * @return PTable instance
+   * @throws SQLException             if connection fails
+   * @throws IllegalArgumentException if validation fails
+   */
+  public static PTable validateTableForMRJob(Connection connection, String 
qualifiedTableName,

Review Comment:
   How about?
   ```suggestion
     public static PTable getPTableWithValidation(Connection connection, String 
qualifiedTableName,
   ```



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow 
row)
+    throws SQLException {
+
+    // Validate required parameters
+    if (row.getTableName() == null || row.getTableName().isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (row.getType() == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (row.getFromTime() == null || row.getToTime() == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }
+
+    try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+      ps.setString(1, row.getTableName());
+      ps.setString(2, row.getTargetCluster());
+      ps.setString(3, row.getType().name());
+      ps.setLong(4, row.getFromTime());
+      ps.setLong(5, row.getToTime());
+      ps.setString(6, row.getTenantId());
+      ps.setBytes(7, row.getStartRowKey());
+      ps.setBytes(8, row.getEndRowKey());
+      ps.setBoolean(9, row.getIsDryRun());
+      ps.setTimestamp(10, row.getExecutionStartTime());
+      ps.setTimestamp(11, row.getExecutionEndTime());
+      ps.setString(12, row.getStatus() != null ? row.getStatus().name() : 
null);

Review Comment:
   Similar to type, though not as important, it would be better to store a code 
rather than the name.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixMRJobUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * A MapReduce tool for verifying and detecting data inconsistencies between 
Phoenix tables across
+ * two HBase clusters (source and target).
+ * <h2>Use Case</h2> This tool is designed for replication/migration 
verification scenarios where
+ * data is replicated from a source Phoenix cluster to a target cluster. It 
efficiently detects
+ * which data chunks are out of sync without transferring all the data over 
the network.
+ * <h2>How It Works</h2>
+ * <ol>
+ * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the 
table into mapper
+ * regions based on HBase region boundaries.</li>
+ * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on 
both source and
+ * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor 
accumulates rows into
+ * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all 
row data (keys +
+ * column families + qualifiers + timestamps + values).</li>
+ * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives 
chunk metadata (start
+ * key, end key, row count, hash) from both clusters and compares the hashes. 
Matching hashes mean
+ * the chunk data is identical; mismatched hashes indicate inconsistency.</li>
+ * <li><b>Result Tracking:</b> Results are check pointed to the {@code 
PHOENIX_SYNC_TABLE_OUTPUT}
+ * table, tracking verified chunks, mismatched chunks, and processing progress 
for resumable
+ * operations.</li>
+ * </ol>
+ * <h2>Usage Example</h2>
+ *
+ * <pre>
+ * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name 
MY_TABLE \
+ * --target-cluster target-zk1,target-zk2:2181:/hbase
+ * </pre>
+ */
+public class PhoenixSyncTableTool extends Configured implements Tool {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableTool.class);
+
+  private static final Option SCHEMA_NAME_OPTION =
+    new Option("s", "schema", true, "Phoenix schema name (optional)");
+  private static final Option TABLE_NAME_OPTION =
+    new Option("tn", "table-name", true, "Table name (mandatory)");
+  private static final Option TARGET_CLUSTER_OPTION =
+    new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum 
(mandatory)");
+  private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", 
true,
+    "Start time in milliseconds for sync (optional, defaults to 0)");
+  private static final Option TO_TIME_OPTION = new Option("tt", "to-time", 
true,
+    "End time in milliseconds for sync (optional, defaults to current time - 1 
hour)");
+  private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", 
false,
+    "Dry run mode - only checkpoint inconsistencies, do not repair 
(optional)");
+  private static final Option CHUNK_SIZE_OPTION =
+    new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, 
defaults to 1GB)");
+  private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", 
"run-foreground", false,
+    "Run the job in foreground. Default - Runs the job in background.");
+  private static final Option TENANT_ID_OPTION =
+    new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific 
table sync (optional)");
+  private static final Option RAW_SCAN_OPTION = new Option("rs", "raw-scan", 
false,
+    "Enable raw scan mode to include delete markers (optional, disabled by 
default)");
+  private static final Option READ_ALL_VERSIONS_OPTION = new Option("rav", 
"read-all-versions",
+    false,
+    "Enable reading all cell versions (optional, disabled by default, reads 
only latest version)");
+  private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
+
+  public static final String PHOENIX_SYNC_TABLE_NAME = 
"phoenix.sync.table.table.name";
+  public static final String PHOENIX_SYNC_TABLE_TARGET_ZK_QUORUM = 
"phoenix.sync.table.target.zk";
+  public static final String PHOENIX_SYNC_TABLE_FROM_TIME = 
"phoenix.sync.table.from.time";
+  public static final String PHOENIX_SYNC_TABLE_TO_TIME = 
"phoenix.sync.table.to.time";
+  public static final String PHOENIX_SYNC_TABLE_DRY_RUN = 
"phoenix.sync.table.dry.run";
+  public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES =
+    "phoenix.sync.table.chunk.size.bytes";
+  public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 
* 1024 * 1024; // 1GB
+  public static final String PHOENIX_SYNC_TABLE_RAW_SCAN = 
"phoenix.sync.table.raw.scan";
+  public static final String PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS =
+    "phoenix.sync.table.read.all.versions";
+
+  private String schemaName;
+  private String tableName;
+  private String targetZkQuorum;
+  private Long startTime;
+  private Long endTime;
+  private boolean isDryRun;
+  private Long chunkSizeBytes;
+  private boolean isForeground;
+  private String tenantId;
+  private boolean isRawScan = false;
+  private boolean isReadAllVersions = false;
+
+  private String qTable;
+  private String qSchemaName;
+
+  private Configuration configuration;
+  private Job job;
+
+  /**
+   * Creates an MR job that uses server-side chunking and checksum calculation
+   * @return Configured MapReduce job ready for submission
+   * @throws Exception if job creation fails
+   */
+  private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) 
throws Exception {
+    configureTimeoutsAndRetries(configuration);
+    setPhoenixSyncTableToolConfiguration(configuration);
+    PhoenixMRJobUtil.updateCapacityQueueInfo(configuration);
+    Job job = Job.getInstance(configuration, getJobName());
+    job.setMapperClass(PhoenixSyncTableMapper.class);
+    job.setJarByClass(PhoenixSyncTableTool.class);
+    TableMapReduceUtil.initCredentials(job);
+    TableMapReduceUtil.addDependencyJars(job);
+    configureInput(job, tableType);
+    configureOutput(job);
+    obtainTargetClusterTokens(job);
+    return job;
+  }
+
+  /**
+   * Obtains HBase delegation tokens from the target cluster and adds them to 
the job. This is
+   * required for cross-cluster kerberos authentication.
+   * @param job The MapReduce job to add tokens
+   */
+  private void obtainTargetClusterTokens(Job job) throws IOException {
+    try {
+      Configuration targetConf =
+        HBaseConfiguration.createClusterConf(job.getConfiguration(), 
targetZkQuorum);
+      TableMapReduceUtil.initCredentialsForCluster(job, targetConf);
+    } catch (IOException e) {
+      if 
(UserProvider.instantiate(job.getConfiguration()).isHBaseSecurityEnabled()) {
+        throw e;
+      }
+      LOGGER.info("Skipping target cluster token acquisition (security not 
enabled): {}",
+        e.getMessage());
+    }
+  }
+
+  /**
+   * Configures timeouts and retry settings for the sync job
+   */
+  private void configureTimeoutsAndRetries(Configuration configuration) {
+    long syncTableQueryTimeoutMs =
+      configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT);
+    long syncTableRPCTimeoutMs = 
configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB,
+      QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT);
+    long syncTableClientScannerTimeoutMs =
+      
configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT);
+    int syncTableRpcRetriesCounter =
+      configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER);
+
+    configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      Long.toString(syncTableClientScannerTimeoutMs));
+    configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, 
Long.toString(syncTableRPCTimeoutMs));
+    configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      Integer.toString(syncTableRpcRetriesCounter));
+    configuration.set(MRJobConfig.TASK_TIMEOUT, 
Long.toString(syncTableQueryTimeoutMs));
+  }
+
+  private void setPhoenixSyncTableToolConfiguration(Configuration 
configuration) {
+    setPhoenixSyncTableName(configuration, qTable);
+    setPhoenixSyncTableTargetZkQuorum(configuration, targetZkQuorum);
+    setPhoenixSyncTableFromTime(configuration, startTime);
+    setPhoenixSyncTableToTime(configuration, endTime);
+    setPhoenixSyncTableDryRun(configuration, isDryRun);
+    setPhoenixSyncTableRawScan(configuration, isRawScan);
+    setPhoenixSyncTableReadAllVersions(configuration, isReadAllVersions);
+    PhoenixConfigurationUtil.setSplitByStats(configuration, false);
+    if (chunkSizeBytes != null) {
+      setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes);
+    }
+    if (tenantId != null) {
+      PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+    }
+    PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+    configuration
+      
.setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
 true);
+  }
+
+  private void configureInput(Job job, PTableType tableType) {
+    // With below query plan, we get Input split based on region boundary
+    String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ ";

Review Comment:
   We can't have an index on the index, so wondering what the purpose of this 
hint is.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java:
##########
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixMRJobUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * A MapReduce tool for verifying and detecting data inconsistencies between 
Phoenix tables across
+ * two HBase clusters (source and target).
+ * <h2>Use Case</h2> This tool is designed for replication/migration 
verification scenarios where
+ * data is replicated from a source Phoenix cluster to a target cluster. It 
efficiently detects
+ * which data chunks are out of sync without transferring all the data over 
the network.
+ * <h2>How It Works</h2>
+ * <ol>
+ * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the 
table into mapper
+ * regions based on HBase region boundaries.</li>
+ * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on 
both source and
+ * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor 
accumulates rows into
+ * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all 
row data (keys +
+ * column families + qualifiers + timestamps + values).</li>
+ * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives 
chunk metadata (start
+ * key, end key, row count, hash) from both clusters and compares the hashes. 
Matching hashes mean
+ * the chunk data is identical; mismatched hashes indicate inconsistency.</li>
+ * <li><b>Result Tracking:</b> Results are check pointed to the {@code 
PHOENIX_SYNC_TABLE_OUTPUT}
+ * table, tracking verified chunks, mismatched chunks, and processing progress 
for resumable
+ * operations.</li>
+ * </ol>
+ * <h2>Usage Example</h2>
+ *
+ * <pre>
+ * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name 
MY_TABLE \
+ * --target-cluster target-zk1,target-zk2:2181:/hbase
+ * </pre>
+ */
+public class PhoenixSyncTableTool extends Configured implements Tool {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableTool.class);
+
+  private static final Option SCHEMA_NAME_OPTION =
+    new Option("s", "schema", true, "Phoenix schema name (optional)");
+  private static final Option TABLE_NAME_OPTION =
+    new Option("tn", "table-name", true, "Table name (mandatory)");
+  private static final Option TARGET_CLUSTER_OPTION =
+    new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum 
(mandatory)");
+  private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", 
true,
+    "Start time in milliseconds for sync (optional, defaults to 0)");
+  private static final Option TO_TIME_OPTION = new Option("tt", "to-time", 
true,
+    "End time in milliseconds for sync (optional, defaults to current time - 1 
hour)");
+  private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", 
false,
+    "Dry run mode - only checkpoint inconsistencies, do not repair 
(optional)");
+  private static final Option CHUNK_SIZE_OPTION =
+    new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, 
defaults to 1GB)");
+  private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", 
"run-foreground", false,
+    "Run the job in foreground. Default - Runs the job in background.");
+  private static final Option TENANT_ID_OPTION =
+    new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific 
table sync (optional)");
+  private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
+
+  private String schemaName;
+  private String tableName;
+  private String targetZkQuorum;
+  private Long startTime;
+  private Long endTime;
+  private boolean isDryRun;
+  private Long chunkSizeBytes;
+  private boolean isForeground;
+  private String tenantId;
+
+  private String qTable;
+  private String qSchemaName;
+
+  private Configuration configuration;
+  private Job job;
+  private PTable pTable;
+
+  /**
+   * Creates an MR job that uses server-side chunking and checksum calculation
+   * @return Configured MapReduce job ready for submission
+   * @throws Exception if job creation fails
+   */
+  private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) 
throws Exception {
+    configureTimeoutsAndRetries(configuration);
+    setPhoenixSyncTableToolConfiguration(configuration);
+    PhoenixMRJobUtil.updateCapacityQueueInfo(configuration);
+    Job job = Job.getInstance(configuration, getJobName());
+    job.setMapperClass(PhoenixSyncTableMapper.class);
+    job.setJarByClass(PhoenixSyncTableTool.class);
+    TableMapReduceUtil.initCredentials(job);
+    TableMapReduceUtil.addDependencyJars(job);
+    configureInput(job, tableType);
+    configureOutput(job);
+    obtainTargetClusterTokens(job);
+    return job;
+  }
+
+  /**
+   * Obtains HBase delegation tokens from the target cluster and adds them to 
the job. This is
+   * required for cross-cluster kerberos authentication.
+   * @param job The MapReduce job to add tokens
+   */
+  private void obtainTargetClusterTokens(Job job) throws IOException {
+    Configuration targetConf =
+      
PhoenixMapReduceUtil.createConfigurationForZkQuorum(job.getConfiguration(), 
targetZkQuorum);
+    TableMapReduceUtil.initCredentialsForCluster(job, targetConf);
+  }
+
+  /**
+   * Configures timeouts and retry settings for the sync job
+   */
+  private void configureTimeoutsAndRetries(Configuration configuration) {
+    long syncTableQueryTimeoutMs =
+      configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT);
+    long syncTableRPCTimeoutMs = 
configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB,
+      QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT);
+    long syncTableClientScannerTimeoutMs =
+      
configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT);
+    int syncTableRpcRetriesCounter =
+      configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER);
+
+    configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      Long.toString(syncTableClientScannerTimeoutMs));
+    configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, 
Long.toString(syncTableRPCTimeoutMs));
+    configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      Integer.toString(syncTableRpcRetriesCounter));
+    configuration.set(MRJobConfig.TASK_TIMEOUT, 
Long.toString(syncTableQueryTimeoutMs));
+  }
+
+  private void setPhoenixSyncTableToolConfiguration(Configuration 
configuration) {
+    PhoenixConfigurationUtil.setPhoenixSyncTableName(configuration, qTable);
+    PhoenixConfigurationUtil.setPhoenixSyncTableTargetZkQuorum(configuration, 
targetZkQuorum);
+    PhoenixConfigurationUtil.setPhoenixSyncTableFromTime(configuration, 
startTime);
+    PhoenixConfigurationUtil.setPhoenixSyncTableToTime(configuration, endTime);
+    PhoenixConfigurationUtil.setPhoenixSyncTableDryRun(configuration, 
isDryRun);
+    PhoenixConfigurationUtil.setSplitByStats(configuration, false);
+    if (chunkSizeBytes != null) {
+      
PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, 
chunkSizeBytes);
+    }
+    if (tenantId != null) {
+      PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+    }
+    PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+    configuration
+      
.setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
 true);
+  }
+
+  private void configureInput(Job job, PTableType tableType) {
+    // With below query plan, we get Input split based on region boundary
+    String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ ";
+    String selectStatement = "SELECT " + hint + "1 FROM " + qTable;
+    PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class,
+      PhoenixSyncTableInputFormat.class, qTable, selectStatement);
+  }
+
+  private void configureOutput(Job job) {
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(NullOutputFormat.class);
+  }
+
+  private String getJobName() {
+    StringBuilder jobName = new StringBuilder("PhoenixSyncTableTool");
+    if (qSchemaName != null) {
+      jobName.append("-").append(qSchemaName);
+    }
+    jobName.append("-").append(tableName);
+    jobName.append("-").append(System.currentTimeMillis());
+    return jobName.toString();
+  }
+
+  public CommandLine parseOptions(String[] args) throws IllegalStateException {
+    Options options = getOptions();
+    CommandLineParser parser = 
DefaultParser.builder().setAllowPartialMatching(false)
+      .setStripLeadingAndTrailingQuotes(false).build();
+    CommandLine cmdLine = null;
+    try {
+      cmdLine = parser.parse(options, args);
+    } catch (ParseException e) {
+      LOGGER.error("Failed to parse command line options. Args: {}. Error: {}",
+        Arrays.toString(args), e.getMessage(), e);
+      printHelpAndExit("Error parsing command line options: " + 
e.getMessage(), options);
+    }
+
+    if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+      printHelpAndExit(options, 0);
+    }
+    requireOption(cmdLine, TABLE_NAME_OPTION);
+    requireOption(cmdLine, TARGET_CLUSTER_OPTION);
+    return cmdLine;
+  }
+
+  private void requireOption(CommandLine cmdLine, Option option) {
+    if (!cmdLine.hasOption(option.getOpt())) {
+      throw new IllegalStateException(option.getLongOpt() + " is a mandatory 
parameter");
+    }
+  }
+
+  private Options getOptions() {
+    Options options = new Options();
+    options.addOption(SCHEMA_NAME_OPTION);
+    options.addOption(TABLE_NAME_OPTION);
+    options.addOption(TARGET_CLUSTER_OPTION);
+    options.addOption(FROM_TIME_OPTION);
+    options.addOption(TO_TIME_OPTION);
+    options.addOption(DRY_RUN_OPTION);
+    options.addOption(CHUNK_SIZE_OPTION);
+    options.addOption(RUN_FOREGROUND_OPTION);
+    options.addOption(TENANT_ID_OPTION);
+    options.addOption(HELP_OPTION);
+    return options;
+  }
+
+  private void printHelpAndExit(String errorMessage, Options options) {
+    System.err.println(errorMessage);
+    printHelpAndExit(options, -1);
+  }
+
+  private void printHelpAndExit(Options options, int exitCode) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp("hadoop jar phoenix-server.jar " + 
PhoenixSyncTableTool.class.getName(),
+      "Synchronize a Phoenix table between source and target clusters", 
options,
+      "\nExample usage:\n"
+        + "hadoop jar phoenix-server.jar 
org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n"
+        + "  --table-name MY_TABLE \\\n" + "  --target-cluster 
<zk_quorum>:2181 \\\n"
+        + "  --dry-run\n",
+      true);
+    System.exit(exitCode);
+  }
+
+  public void populateSyncTableToolAttributes(CommandLine cmdLine) {
+    tableName = cmdLine.getOptionValue(TABLE_NAME_OPTION.getOpt());
+    targetZkQuorum = cmdLine.getOptionValue(TARGET_CLUSTER_OPTION.getOpt());
+    schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+
+    if (cmdLine.hasOption(FROM_TIME_OPTION.getOpt())) {
+      startTime = 
Long.valueOf(cmdLine.getOptionValue(FROM_TIME_OPTION.getOpt()));
+    } else {
+      startTime = 0L;
+    }
+
+    if (cmdLine.hasOption(TO_TIME_OPTION.getOpt())) {
+      endTime = Long.valueOf(cmdLine.getOptionValue(TO_TIME_OPTION.getOpt()));
+    } else {
+      // Default endTime, current time - 1 hour
+      endTime = EnvironmentEdgeManager.currentTimeMillis() - (60 * 60 * 1000);
+    }
+
+    if (cmdLine.hasOption(CHUNK_SIZE_OPTION.getOpt())) {
+      chunkSizeBytes = 
Long.valueOf(cmdLine.getOptionValue(CHUNK_SIZE_OPTION.getOpt()));
+      if (chunkSizeBytes <= 0) {
+        throw new IllegalArgumentException(
+          "Chunk size must be a positive value, got: " + chunkSizeBytes);
+      }
+    }
+    if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+      tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+    }
+    isDryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt());
+    isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+    qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
+    qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
+    PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
+    PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, 
qTable);

Review Comment:
   Don't we need to validate that the startTime is within the max lookback 
window? If startTime is beyond the window, background compactions on the source 
and target clusters may have purged different sets of historical versions or 
delete markers. Since these compactions don't run in sync, the tool will see 
different data states on each cluster, leading to false-positive mismatches for 
data that is actually consistent but has simply been cleaned up at different 
times.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",

Review Comment:
   No verification is being done, perhaps you want to say something like this?
   
   ```suggestion
         LOGGER.info("Initialization of checkpoint table {} complete",
   ```



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow 
row)
+    throws SQLException {
+
+    // Validate required parameters
+    if (row.getTableName() == null || row.getTableName().isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (row.getType() == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (row.getFromTime() == null || row.getToTime() == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }

Review Comment:
   I see you are already using in a few other places, why not use here and a 
few other places and make it consistent and less verbose?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;

Review Comment:
   Why is `UPSERT_CHECKPOINT_SQL` but this one is not? It seems inconsistent.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java:
##########
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixMRJobUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * A MapReduce tool for verifying and detecting data inconsistencies between 
Phoenix tables across
+ * two HBase clusters (source and target).
+ * <h2>Use Case</h2> This tool is designed for replication/migration 
verification scenarios where
+ * data is replicated from a source Phoenix cluster to a target cluster. It 
efficiently detects
+ * which data chunks are out of sync without transferring all the data over 
the network.
+ * <h2>How It Works</h2>
+ * <ol>
+ * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the 
table into mapper
+ * regions based on HBase region boundaries.</li>
+ * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on 
both source and
+ * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor 
accumulates rows into
+ * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all 
row data (keys +
+ * column families + qualifiers + timestamps + values).</li>
+ * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives 
chunk metadata (start
+ * key, end key, row count, hash) from both clusters and compares the hashes. 
Matching hashes mean
+ * the chunk data is identical; mismatched hashes indicate inconsistency.</li>
+ * <li><b>Result Tracking:</b> Results are check pointed to the {@code 
PHOENIX_SYNC_TABLE_OUTPUT}
+ * table, tracking verified chunks, mismatched chunks, and processing progress 
for resumable
+ * operations.</li>
+ * </ol>
+ * <h2>Usage Example</h2>
+ *
+ * <pre>
+ * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name 
MY_TABLE \
+ * --target-cluster target-zk1,target-zk2:2181:/hbase
+ * </pre>
+ */
+public class PhoenixSyncTableTool extends Configured implements Tool {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableTool.class);
+
+  private static final Option SCHEMA_NAME_OPTION =
+    new Option("s", "schema", true, "Phoenix schema name (optional)");
+  private static final Option TABLE_NAME_OPTION =
+    new Option("tn", "table-name", true, "Table name (mandatory)");
+  private static final Option TARGET_CLUSTER_OPTION =
+    new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum 
(mandatory)");
+  private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", 
true,
+    "Start time in milliseconds for sync (optional, defaults to 0)");
+  private static final Option TO_TIME_OPTION = new Option("tt", "to-time", 
true,
+    "End time in milliseconds for sync (optional, defaults to current time - 1 
hour)");
+  private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", 
false,
+    "Dry run mode - only checkpoint inconsistencies, do not repair 
(optional)");
+  private static final Option CHUNK_SIZE_OPTION =
+    new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, 
defaults to 1GB)");
+  private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", 
"run-foreground", false,
+    "Run the job in foreground. Default - Runs the job in background.");
+  private static final Option TENANT_ID_OPTION =
+    new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific 
table sync (optional)");
+  private static final Option RAW_SCAN_OPTION = new Option("rs", "raw-scan", 
false,
+    "Enable raw scan mode to include delete markers (optional, disabled by 
default)");
+  private static final Option READ_ALL_VERSIONS_OPTION = new Option("rav", 
"read-all-versions",
+    false,
+    "Enable reading all cell versions (optional, disabled by default, reads 
only latest version)");
+  private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
+
+  public static final String PHOENIX_SYNC_TABLE_NAME = 
"phoenix.sync.table.table.name";
+  public static final String PHOENIX_SYNC_TABLE_TARGET_ZK_QUORUM = 
"phoenix.sync.table.target.zk";
+  public static final String PHOENIX_SYNC_TABLE_FROM_TIME = 
"phoenix.sync.table.from.time";
+  public static final String PHOENIX_SYNC_TABLE_TO_TIME = 
"phoenix.sync.table.to.time";
+  public static final String PHOENIX_SYNC_TABLE_DRY_RUN = 
"phoenix.sync.table.dry.run";
+  public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES =
+    "phoenix.sync.table.chunk.size.bytes";
+  public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 
* 1024 * 1024; // 1GB
+  public static final String PHOENIX_SYNC_TABLE_RAW_SCAN = 
"phoenix.sync.table.raw.scan";
+  public static final String PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS =
+    "phoenix.sync.table.read.all.versions";
+
+  private String schemaName;
+  private String tableName;
+  private String targetZkQuorum;
+  private Long startTime;
+  private Long endTime;
+  private boolean isDryRun;
+  private Long chunkSizeBytes;
+  private boolean isForeground;
+  private String tenantId;
+  private boolean isRawScan = false;
+  private boolean isReadAllVersions = false;
+
+  private String qTable;
+  private String qSchemaName;
+
+  private Configuration configuration;
+  private Job job;
+
+  /**
+   * Creates an MR job that uses server-side chunking and checksum calculation
+   * @return Configured MapReduce job ready for submission
+   * @throws Exception if job creation fails
+   */
+  private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) 
throws Exception {
+    configureTimeoutsAndRetries(configuration);
+    setPhoenixSyncTableToolConfiguration(configuration);
+    PhoenixMRJobUtil.updateCapacityQueueInfo(configuration);
+    Job job = Job.getInstance(configuration, getJobName());
+    job.setMapperClass(PhoenixSyncTableMapper.class);
+    job.setJarByClass(PhoenixSyncTableTool.class);
+    TableMapReduceUtil.initCredentials(job);
+    TableMapReduceUtil.addDependencyJars(job);
+    configureInput(job, tableType);
+    configureOutput(job);
+    obtainTargetClusterTokens(job);
+    return job;
+  }
+
+  /**
+   * Obtains HBase delegation tokens from the target cluster and adds them to 
the job. This is
+   * required for cross-cluster kerberos authentication.
+   * @param job The MapReduce job to add tokens
+   */
+  private void obtainTargetClusterTokens(Job job) throws IOException {
+    try {
+      Configuration targetConf =
+        HBaseConfiguration.createClusterConf(job.getConfiguration(), 
targetZkQuorum);
+      TableMapReduceUtil.initCredentialsForCluster(job, targetConf);
+    } catch (IOException e) {
+      if 
(UserProvider.instantiate(job.getConfiguration()).isHBaseSecurityEnabled()) {
+        throw e;
+      }
+      LOGGER.info("Skipping target cluster token acquisition (security not 
enabled): {}",
+        e.getMessage());
+    }
+  }
+
+  /**
+   * Configures timeouts and retry settings for the sync job
+   */
+  private void configureTimeoutsAndRetries(Configuration configuration) {
+    long syncTableQueryTimeoutMs =
+      configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT);
+    long syncTableRPCTimeoutMs = 
configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB,
+      QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT);
+    long syncTableClientScannerTimeoutMs =
+      
configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT);
+    int syncTableRpcRetriesCounter =
+      configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER);
+
+    configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      Long.toString(syncTableClientScannerTimeoutMs));
+    configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, 
Long.toString(syncTableRPCTimeoutMs));
+    configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      Integer.toString(syncTableRpcRetriesCounter));
+    configuration.set(MRJobConfig.TASK_TIMEOUT, 
Long.toString(syncTableQueryTimeoutMs));
+  }
+
+  private void setPhoenixSyncTableToolConfiguration(Configuration 
configuration) {
+    setPhoenixSyncTableName(configuration, qTable);
+    setPhoenixSyncTableTargetZkQuorum(configuration, targetZkQuorum);
+    setPhoenixSyncTableFromTime(configuration, startTime);
+    setPhoenixSyncTableToTime(configuration, endTime);
+    setPhoenixSyncTableDryRun(configuration, isDryRun);
+    setPhoenixSyncTableRawScan(configuration, isRawScan);
+    setPhoenixSyncTableReadAllVersions(configuration, isReadAllVersions);
+    PhoenixConfigurationUtil.setSplitByStats(configuration, false);
+    if (chunkSizeBytes != null) {
+      setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes);
+    }
+    if (tenantId != null) {
+      PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+    }
+    PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+    configuration
+      
.setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
 true);
+  }
+
+  private void configureInput(Job job, PTableType tableType) {
+    // With below query plan, we get Input split based on region boundary
+    String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ ";
+    String selectStatement = "SELECT " + hint + "1 FROM " + qTable;
+    PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class,
+      PhoenixSyncTableInputFormat.class, qTable, selectStatement);

Review Comment:
   I noticed getSplits() calls getSelectStatement(), which seems to override 
the SELECT_STATEMENT config, am I reading it wrong?



##########
phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java:
##########
@@ -337,4 +337,5 @@ public void testIndexToolSourceConfig() {
     sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
     Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE);
   }
+

Review Comment:
   Inadvertent change?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow 
row)
+    throws SQLException {
+
+    // Validate required parameters
+    if (row.getTableName() == null || row.getTableName().isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (row.getType() == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (row.getFromTime() == null || row.getToTime() == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }
+
+    try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+      ps.setString(1, row.getTableName());
+      ps.setString(2, row.getTargetCluster());
+      ps.setString(3, row.getType().name());
+      ps.setLong(4, row.getFromTime());
+      ps.setLong(5, row.getToTime());
+      ps.setString(6, row.getTenantId());
+      ps.setBytes(7, row.getStartRowKey());
+      ps.setBytes(8, row.getEndRowKey());
+      ps.setBoolean(9, row.getIsDryRun());
+      ps.setTimestamp(10, row.getExecutionStartTime());
+      ps.setTimestamp(11, row.getExecutionEndTime());
+      ps.setString(12, row.getStatus() != null ? row.getStatus().name() : 
null);
+      ps.setString(13, row.getCounters());
+      ps.executeUpdate();
+      connection.commit();
+    }
+  }
+
+  /**
+   * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat 
to filter out
+   * already-processed regions.
+   * @param tableName     Source table name
+   * @param targetCluster Target cluster ZK quorum
+   * @param fromTime      Start timestamp
+   * @param toTime        End timestamp
+   * @param tenantId      Tenant ID
+   * @return List of completed mapper regions
+   */
+  public List<PhoenixSyncTableCheckpointOutputRow> 
getProcessedMapperRegions(String tableName,
+    String targetCluster, Long fromTime, Long toTime, String tenantId) throws 
SQLException {
+
+    StringBuilder queryBuilder = new StringBuilder();
+    queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM ")
+      .append(SYNC_TABLE_CHECKPOINT_TABLE_NAME)
+      .append(" WHERE TABLE_NAME = ?  AND TARGET_CLUSTER = ?")
+      .append(" AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? ");
+
+    // Conditionally build TENANT_ID clause based on whether tenantId is null
+    if (tenantId == null) {
+      queryBuilder.append(" AND TENANT_ID IS NULL");
+    } else {
+      queryBuilder.append(" AND TENANT_ID = ?");
+    }
+
+    queryBuilder.append(
+      " ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, 
TENANT_ID, START_ROW_KEY");
+
+    List<PhoenixSyncTableCheckpointOutputRow> results = new ArrayList<>();
+    try (PreparedStatement ps = 
connection.prepareStatement(queryBuilder.toString())) {
+      int paramIndex = 1;
+      ps.setString(paramIndex++, tableName);
+      ps.setString(paramIndex++, targetCluster);
+      ps.setString(paramIndex++, Type.REGION.name());
+      ps.setLong(paramIndex++, fromTime);
+      ps.setLong(paramIndex++, toTime);
+      // Only bind tenantId parameter if non-null
+      if (tenantId != null) {
+        ps.setString(paramIndex, tenantId);
+      }
+      try (ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+          PhoenixSyncTableCheckpointOutputRow row =
+            new PhoenixSyncTableCheckpointOutputRow.Builder()
+              
.setStartRowKey(rs.getBytes("START_ROW_KEY")).setEndRowKey(rs.getBytes("END_ROW_KEY"))
+              .build();
+          results.add(row);
+        }
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip 
already-processed chunks.
+   * @param tableName         Source table name
+   * @param targetCluster     Target cluster ZK quorum
+   * @param fromTime          Start timestamp
+   * @param toTime            End timestamp
+   * @param tenantId          Tenant ID
+   * @param mapperRegionStart Mapper region start key
+   * @param mapperRegionEnd   Mapper region end key
+   * @return List of processed chunks in the region
+   */
+  public List<PhoenixSyncTableCheckpointOutputRow> getProcessedChunks(String 
tableName,
+    String targetCluster, Long fromTime, Long toTime, String tenantId, byte[] 
mapperRegionStart,
+    byte[] mapperRegionEnd) throws SQLException {
+    StringBuilder queryBuilder = new StringBuilder();
+    queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME
+      + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? "
+      + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?");
+
+    // Conditionally build TENANT_ID clause based on whether tenantId is null
+    if (tenantId == null) {
+      queryBuilder.append(" AND TENANT_ID IS NULL");
+    } else {
+      queryBuilder.append(" AND TENANT_ID = ?");
+    }

Review Comment:
   I don't see TENANT_ID column on the table, what am I missing?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java:
##########
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixMRJobUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * A MapReduce tool for verifying and detecting data inconsistencies between 
Phoenix tables across
+ * two HBase clusters (source and target).
+ * <h2>Use Case</h2> This tool is designed for replication/migration 
verification scenarios where
+ * data is replicated from a source Phoenix cluster to a target cluster. It 
efficiently detects
+ * which data chunks are out of sync without transferring all the data over 
the network.
+ * <h2>How It Works</h2>
+ * <ol>
+ * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the 
table into mapper
+ * regions based on HBase region boundaries.</li>
+ * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on 
both source and
+ * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor 
accumulates rows into
+ * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all 
row data (keys +
+ * column families + qualifiers + timestamps + values).</li>
+ * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives 
chunk metadata (start
+ * key, end key, row count, hash) from both clusters and compares the hashes. 
Matching hashes mean
+ * the chunk data is identical; mismatched hashes indicate inconsistency.</li>
+ * <li><b>Result Tracking:</b> Results are check pointed to the {@code 
PHOENIX_SYNC_TABLE_OUTPUT}
+ * table, tracking verified chunks, mismatched chunks, and processing progress 
for resumable
+ * operations.</li>
+ * </ol>
+ * <h2>Usage Example</h2>
+ *
+ * <pre>
+ * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name 
MY_TABLE \
+ * --target-cluster target-zk1,target-zk2:2181:/hbase
+ * </pre>
+ */
+public class PhoenixSyncTableTool extends Configured implements Tool {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableTool.class);
+
+  private static final Option SCHEMA_NAME_OPTION =
+    new Option("s", "schema", true, "Phoenix schema name (optional)");
+  private static final Option TABLE_NAME_OPTION =
+    new Option("tn", "table-name", true, "Table name (mandatory)");
+  private static final Option TARGET_CLUSTER_OPTION =
+    new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum 
(mandatory)");
+  private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", 
true,
+    "Start time in milliseconds for sync (optional, defaults to 0)");
+  private static final Option TO_TIME_OPTION = new Option("tt", "to-time", 
true,
+    "End time in milliseconds for sync (optional, defaults to current time - 1 
hour)");
+  private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", 
false,
+    "Dry run mode - only checkpoint inconsistencies, do not repair 
(optional)");
+  private static final Option CHUNK_SIZE_OPTION =
+    new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, 
defaults to 1GB)");
+  private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", 
"run-foreground", false,
+    "Run the job in foreground. Default - Runs the job in background.");
+  private static final Option TENANT_ID_OPTION =
+    new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific 
table sync (optional)");
+  private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
+
+  private String schemaName;
+  private String tableName;
+  private String targetZkQuorum;
+  private Long startTime;
+  private Long endTime;
+  private boolean isDryRun;
+  private Long chunkSizeBytes;
+  private boolean isForeground;
+  private String tenantId;
+
+  private String qTable;
+  private String qSchemaName;
+
+  private Configuration configuration;
+  private Job job;
+  private PTable pTable;
+
+  /**
+   * Creates an MR job that uses server-side chunking and checksum calculation
+   * @return Configured MapReduce job ready for submission
+   * @throws Exception if job creation fails
+   */
+  private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) 
throws Exception {
+    configureTimeoutsAndRetries(configuration);
+    setPhoenixSyncTableToolConfiguration(configuration);
+    PhoenixMRJobUtil.updateCapacityQueueInfo(configuration);
+    Job job = Job.getInstance(configuration, getJobName());
+    job.setMapperClass(PhoenixSyncTableMapper.class);
+    job.setJarByClass(PhoenixSyncTableTool.class);
+    TableMapReduceUtil.initCredentials(job);
+    TableMapReduceUtil.addDependencyJars(job);
+    configureInput(job, tableType);
+    configureOutput(job);
+    obtainTargetClusterTokens(job);
+    return job;
+  }
+
+  /**
+   * Obtains HBase delegation tokens from the target cluster and adds them to 
the job. This is
+   * required for cross-cluster kerberos authentication.
+   * @param job The MapReduce job to add tokens
+   */
+  private void obtainTargetClusterTokens(Job job) throws IOException {
+    Configuration targetConf =
+      
PhoenixMapReduceUtil.createConfigurationForZkQuorum(job.getConfiguration(), 
targetZkQuorum);
+    TableMapReduceUtil.initCredentialsForCluster(job, targetConf);
+  }
+
+  /**
+   * Configures timeouts and retry settings for the sync job
+   */
+  private void configureTimeoutsAndRetries(Configuration configuration) {
+    long syncTableQueryTimeoutMs =
+      configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT);
+    long syncTableRPCTimeoutMs = 
configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB,
+      QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT);
+    long syncTableClientScannerTimeoutMs =
+      
configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT);
+    int syncTableRpcRetriesCounter =
+      configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER);
+
+    configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      Long.toString(syncTableClientScannerTimeoutMs));
+    configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, 
Long.toString(syncTableRPCTimeoutMs));
+    configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      Integer.toString(syncTableRpcRetriesCounter));
+    configuration.set(MRJobConfig.TASK_TIMEOUT, 
Long.toString(syncTableQueryTimeoutMs));
+  }
+
+  private void setPhoenixSyncTableToolConfiguration(Configuration 
configuration) {
+    PhoenixConfigurationUtil.setPhoenixSyncTableName(configuration, qTable);
+    PhoenixConfigurationUtil.setPhoenixSyncTableTargetZkQuorum(configuration, 
targetZkQuorum);
+    PhoenixConfigurationUtil.setPhoenixSyncTableFromTime(configuration, 
startTime);
+    PhoenixConfigurationUtil.setPhoenixSyncTableToTime(configuration, endTime);
+    PhoenixConfigurationUtil.setPhoenixSyncTableDryRun(configuration, 
isDryRun);
+    PhoenixConfigurationUtil.setSplitByStats(configuration, false);
+    if (chunkSizeBytes != null) {
+      
PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, 
chunkSizeBytes);
+    }
+    if (tenantId != null) {
+      PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+    }
+    PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+    configuration
+      
.setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
 true);
+  }
+
+  private void configureInput(Job job, PTableType tableType) {
+    // With below query plan, we get Input split based on region boundary
+    String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ ";
+    String selectStatement = "SELECT " + hint + "1 FROM " + qTable;
+    PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class,
+      PhoenixSyncTableInputFormat.class, qTable, selectStatement);
+  }
+
+  private void configureOutput(Job job) {
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(NullOutputFormat.class);
+  }
+
+  private String getJobName() {
+    StringBuilder jobName = new StringBuilder("PhoenixSyncTableTool");
+    if (qSchemaName != null) {
+      jobName.append("-").append(qSchemaName);
+    }
+    jobName.append("-").append(tableName);
+    jobName.append("-").append(System.currentTimeMillis());
+    return jobName.toString();
+  }
+
+  public CommandLine parseOptions(String[] args) throws IllegalStateException {
+    Options options = getOptions();
+    CommandLineParser parser = 
DefaultParser.builder().setAllowPartialMatching(false)
+      .setStripLeadingAndTrailingQuotes(false).build();
+    CommandLine cmdLine = null;
+    try {
+      cmdLine = parser.parse(options, args);
+    } catch (ParseException e) {
+      LOGGER.error("Failed to parse command line options. Args: {}. Error: {}",
+        Arrays.toString(args), e.getMessage(), e);
+      printHelpAndExit("Error parsing command line options: " + 
e.getMessage(), options);
+    }
+
+    if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+      printHelpAndExit(options, 0);
+    }
+    requireOption(cmdLine, TABLE_NAME_OPTION);
+    requireOption(cmdLine, TARGET_CLUSTER_OPTION);
+    return cmdLine;
+  }
+
+  private void requireOption(CommandLine cmdLine, Option option) {
+    if (!cmdLine.hasOption(option.getOpt())) {
+      throw new IllegalStateException(option.getLongOpt() + " is a mandatory 
parameter");
+    }
+  }
+
+  private Options getOptions() {
+    Options options = new Options();
+    options.addOption(SCHEMA_NAME_OPTION);
+    options.addOption(TABLE_NAME_OPTION);
+    options.addOption(TARGET_CLUSTER_OPTION);
+    options.addOption(FROM_TIME_OPTION);
+    options.addOption(TO_TIME_OPTION);
+    options.addOption(DRY_RUN_OPTION);
+    options.addOption(CHUNK_SIZE_OPTION);
+    options.addOption(RUN_FOREGROUND_OPTION);
+    options.addOption(TENANT_ID_OPTION);
+    options.addOption(HELP_OPTION);
+    return options;
+  }
+
+  private void printHelpAndExit(String errorMessage, Options options) {
+    System.err.println(errorMessage);
+    printHelpAndExit(options, -1);
+  }
+
+  private void printHelpAndExit(Options options, int exitCode) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp("hadoop jar phoenix-server.jar " + 
PhoenixSyncTableTool.class.getName(),
+      "Synchronize a Phoenix table between source and target clusters", 
options,
+      "\nExample usage:\n"
+        + "hadoop jar phoenix-server.jar 
org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n"
+        + "  --table-name MY_TABLE \\\n" + "  --target-cluster 
<zk_quorum>:2181 \\\n"
+        + "  --dry-run\n",
+      true);
+    System.exit(exitCode);
+  }
+
+  public void populateSyncTableToolAttributes(CommandLine cmdLine) {
+    tableName = cmdLine.getOptionValue(TABLE_NAME_OPTION.getOpt());
+    targetZkQuorum = cmdLine.getOptionValue(TARGET_CLUSTER_OPTION.getOpt());
+    schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+
+    if (cmdLine.hasOption(FROM_TIME_OPTION.getOpt())) {
+      startTime = 
Long.valueOf(cmdLine.getOptionValue(FROM_TIME_OPTION.getOpt()));
+    } else {
+      startTime = 0L;
+    }
+
+    if (cmdLine.hasOption(TO_TIME_OPTION.getOpt())) {
+      endTime = Long.valueOf(cmdLine.getOptionValue(TO_TIME_OPTION.getOpt()));
+    } else {
+      // Default endTime, current time - 1 hour
+      endTime = EnvironmentEdgeManager.currentTimeMillis() - (60 * 60 * 1000);
+    }
+
+    if (cmdLine.hasOption(CHUNK_SIZE_OPTION.getOpt())) {
+      chunkSizeBytes = 
Long.valueOf(cmdLine.getOptionValue(CHUNK_SIZE_OPTION.getOpt()));
+      if (chunkSizeBytes <= 0) {
+        throw new IllegalArgumentException(
+          "Chunk size must be a positive value, got: " + chunkSizeBytes);
+      }
+    }
+    if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+      tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+    }
+    isDryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt());
+    isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+    qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
+    qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
+    PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
+    PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, 
qTable);

Review Comment:
   On the other hand, we should not only enforce that it is not outside the 
window, we should also enforce a "safety buffer" to accommodate the data in 
flight. Even when the endTime is with in the window, if it is too close to the 
current time, it may miss the data that is still in flight and may cause false 
positives. In practice, this may not matter as the time it takes to setup and 
run could be in the order of several minutes and so enough for the catch up to 
complete, but I think it is better to make it explicit by enforcing a safety 
buffer and make this more deterministic.
   
   If we remove this check and allow the endTime to be in the future, the 
possibility of having false positives due to the data in flight becomes a lot 
more pronounced. By enforcing both startTime and endTIme, we can ensure a 
"consistent window" where data is guaranteed to be fully replicated and 
'quiesced' on both sides. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to