haridsv commented on code in PR #2379:
URL: https://github.com/apache/phoenix/pull/2379#discussion_r3004205492


##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableCheckpointOutputRow.java:
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Data model class representing required row in the 
PHOENIX_SYNC_TABLE_CHECKPOINT table
+ */
+public class PhoenixSyncTableCheckpointOutputRow {
+
+  public enum Type {
+    CHUNK,
+    REGION
+  }
+
+  public enum Status {
+    VERIFIED,
+    MISMATCHED
+  }
+
+  private String tableName;
+  private String targetCluster;
+  private Type type;
+  private Long fromTime;
+  private Long toTime;
+  private String tenantId;
+  private Boolean isDryRun;
+  private byte[] startRowKey;
+  private byte[] endRowKey;
+  private Timestamp executionStartTime;
+  private Timestamp executionEndTime;
+  private Status status;
+  private String counters;
+
+  @Override
+  public String toString() {
+    return String.format("SyncOutputRow[table=%s, target=%s, type=%s, 
start=%s, end=%s, status=%s]",
+      tableName, targetCluster, type, Bytes.toStringBinary(startRowKey),
+      Bytes.toStringBinary(endRowKey), status);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PhoenixSyncTableCheckpointOutputRow that = 
(PhoenixSyncTableCheckpointOutputRow) o;
+    return Objects.equals(tableName, that.tableName)
+      && Objects.equals(targetCluster, that.targetCluster) && type == that.type
+      && Objects.equals(fromTime, that.fromTime) && Objects.equals(toTime, 
that.toTime)
+      && Objects.equals(tenantId, that.tenantId) && Objects.equals(isDryRun, 
that.isDryRun)
+      && Arrays.equals(startRowKey, that.startRowKey) && 
Arrays.equals(endRowKey, that.endRowKey)
+      && Objects.equals(executionStartTime, that.executionStartTime)
+      && Objects.equals(executionEndTime, that.executionEndTime) && status == 
that.status
+      && Objects.equals(counters, that.counters);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = Objects.hash(tableName, targetCluster, type, fromTime, 
toTime, tenantId, isDryRun,
+      executionStartTime, executionEndTime, status, counters);
+    result = 31 * result + Arrays.hashCode(startRowKey);
+    result = 31 * result + Arrays.hashCode(endRowKey);
+    return result;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getTargetCluster() {
+    return targetCluster;
+  }
+
+  public Type getType() {
+    return type;
+  }
+
+  public Long getFromTime() {
+    return fromTime;
+  }
+
+  public Long getToTime() {
+    return toTime;
+  }
+
+  public String getTenantId() {
+    return tenantId;
+  }
+
+  public Boolean getIsDryRun() {
+    return isDryRun;
+  }
+
+  public byte[] getStartRowKey() {
+    return startRowKey != null ? Arrays.copyOf(startRowKey, 
startRowKey.length) : null;
+  }
+
+  public byte[] getEndRowKey() {
+    return endRowKey != null ? Arrays.copyOf(endRowKey, endRowKey.length) : 
null;
+  }
+
+  public Timestamp getExecutionStartTime() {
+    return executionStartTime;
+  }
+
+  public Timestamp getExecutionEndTime() {
+    return executionEndTime;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public String getCounters() {
+    return counters;
+  }
+
+  @VisibleForTesting
+  public long getSourceRowsProcessed() {
+    return 
parseCounterValue(PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED.name());
+  }
+
+  @VisibleForTesting
+  public long getTargetRowsProcessed() {
+    return 
parseCounterValue(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name());
+  }
+
+  private long parseCounterValue(String counterName) {
+    if (counters == null || counters.isEmpty()) {
+      return 0;
+    }
+
+    String[] pairs = counters.split(",");
+    for (String pair : pairs) {
+      String[] keyValue = pair.split("=");
+      if (keyValue.length == 2 && keyValue[0].trim().equals(counterName)) {
+        return Long.parseLong(keyValue[1].trim());
+      }
+    }
+    return 0;
+  }

Review Comment:
   The counters string is generated outside the class, while parsing is done 
inside, this mismatch in abstraction is not good. I suggest you either 
encapsulate both in this class or move both into one util class.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow 
row)
+    throws SQLException {
+
+    // Validate required parameters
+    if (row.getTableName() == null || row.getTableName().isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (row.getType() == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (row.getFromTime() == null || row.getToTime() == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }
+
+    try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+      ps.setString(1, row.getTableName());
+      ps.setString(2, row.getTargetCluster());
+      ps.setString(3, row.getType().name());
+      ps.setLong(4, row.getFromTime());
+      ps.setLong(5, row.getToTime());
+      ps.setString(6, row.getTenantId());
+      ps.setBytes(7, row.getStartRowKey());
+      ps.setBytes(8, row.getEndRowKey());
+      ps.setBoolean(9, row.getIsDryRun());
+      ps.setTimestamp(10, row.getExecutionStartTime());
+      ps.setTimestamp(11, row.getExecutionEndTime());
+      ps.setString(12, row.getStatus() != null ? row.getStatus().name() : 
null);
+      ps.setString(13, row.getCounters());
+      ps.executeUpdate();
+      connection.commit();
+    }
+  }
+
+  /**
+   * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat 
to filter out
+   * already-processed regions.
+   * @param tableName     Source table name
+   * @param targetCluster Target cluster ZK quorum
+   * @param fromTime      Start timestamp
+   * @param toTime        End timestamp
+   * @param tenantId      Tenant ID
+   * @return List of completed mapper regions
+   */
+  public List<PhoenixSyncTableCheckpointOutputRow> 
getProcessedMapperRegions(String tableName,
+    String targetCluster, Long fromTime, Long toTime, String tenantId) throws 
SQLException {
+
+    StringBuilder queryBuilder = new StringBuilder();
+    queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM ")
+      .append(SYNC_TABLE_CHECKPOINT_TABLE_NAME)
+      .append(" WHERE TABLE_NAME = ?  AND TARGET_CLUSTER = ?")
+      .append(" AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? ");
+
+    // Conditionally build TENANT_ID clause based on whether tenantId is null
+    if (tenantId == null) {
+      queryBuilder.append(" AND TENANT_ID IS NULL");
+    } else {
+      queryBuilder.append(" AND TENANT_ID = ?");
+    }
+
+    queryBuilder.append(
+      " ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, 
TENANT_ID, START_ROW_KEY");

Review Comment:
   This is already the PK order, correct? Why do we need an explicit ORDER BY?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow 
row)
+    throws SQLException {
+
+    // Validate required parameters
+    if (row.getTableName() == null || row.getTableName().isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (row.getType() == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (row.getFromTime() == null || row.getToTime() == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }
+
+    try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+      ps.setString(1, row.getTableName());
+      ps.setString(2, row.getTargetCluster());
+      ps.setString(3, row.getType().name());
+      ps.setLong(4, row.getFromTime());
+      ps.setLong(5, row.getToTime());
+      ps.setString(6, row.getTenantId());
+      ps.setBytes(7, row.getStartRowKey());
+      ps.setBytes(8, row.getEndRowKey());
+      ps.setBoolean(9, row.getIsDryRun());
+      ps.setTimestamp(10, row.getExecutionStartTime());
+      ps.setTimestamp(11, row.getExecutionEndTime());
+      ps.setString(12, row.getStatus() != null ? row.getStatus().name() : 
null);
+      ps.setString(13, row.getCounters());
+      ps.executeUpdate();
+      connection.commit();
+    }
+  }
+
+  /**
+   * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat 
to filter out
+   * already-processed regions.
+   * @param tableName     Source table name
+   * @param targetCluster Target cluster ZK quorum
+   * @param fromTime      Start timestamp
+   * @param toTime        End timestamp
+   * @param tenantId      Tenant ID
+   * @return List of completed mapper regions
+   */
+  public List<PhoenixSyncTableCheckpointOutputRow> 
getProcessedMapperRegions(String tableName,
+    String targetCluster, Long fromTime, Long toTime, String tenantId) throws 
SQLException {
+
+    StringBuilder queryBuilder = new StringBuilder();
+    queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM ")
+      .append(SYNC_TABLE_CHECKPOINT_TABLE_NAME)
+      .append(" WHERE TABLE_NAME = ?  AND TARGET_CLUSTER = ?")
+      .append(" AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? ");
+
+    // Conditionally build TENANT_ID clause based on whether tenantId is null
+    if (tenantId == null) {
+      queryBuilder.append(" AND TENANT_ID IS NULL");
+    } else {
+      queryBuilder.append(" AND TENANT_ID = ?");
+    }
+
+    queryBuilder.append(
+      " ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, 
TENANT_ID, START_ROW_KEY");
+
+    List<PhoenixSyncTableCheckpointOutputRow> results = new ArrayList<>();
+    try (PreparedStatement ps = 
connection.prepareStatement(queryBuilder.toString())) {
+      int paramIndex = 1;
+      ps.setString(paramIndex++, tableName);
+      ps.setString(paramIndex++, targetCluster);
+      ps.setString(paramIndex++, Type.REGION.name());
+      ps.setLong(paramIndex++, fromTime);
+      ps.setLong(paramIndex++, toTime);
+      // Only bind tenantId parameter if non-null
+      if (tenantId != null) {
+        ps.setString(paramIndex, tenantId);
+      }
+      try (ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+          PhoenixSyncTableCheckpointOutputRow row =
+            new PhoenixSyncTableCheckpointOutputRow.Builder()
+              
.setStartRowKey(rs.getBytes("START_ROW_KEY")).setEndRowKey(rs.getBytes("END_ROW_KEY"))
+              .build();
+          results.add(row);
+        }
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip 
already-processed chunks.
+   * @param tableName         Source table name
+   * @param targetCluster     Target cluster ZK quorum
+   * @param fromTime          Start timestamp
+   * @param toTime            End timestamp
+   * @param tenantId          Tenant ID
+   * @param mapperRegionStart Mapper region start key
+   * @param mapperRegionEnd   Mapper region end key
+   * @return List of processed chunks in the region
+   */
+  public List<PhoenixSyncTableCheckpointOutputRow> getProcessedChunks(String 
tableName,
+    String targetCluster, Long fromTime, Long toTime, String tenantId, byte[] 
mapperRegionStart,
+    byte[] mapperRegionEnd) throws SQLException {
+    StringBuilder queryBuilder = new StringBuilder();
+    queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME
+      + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? "
+      + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?");
+
+    // Conditionally build TENANT_ID clause based on whether tenantId is null
+    if (tenantId == null) {
+      queryBuilder.append(" AND TENANT_ID IS NULL");
+    } else {
+      queryBuilder.append(" AND TENANT_ID = ?");
+    }
+
+    // Check if mapper region boundaries are non-empty (i.e., NOT first/last 
regions)
+    // Only add boundary conditions for non-empty boundaries
+    boolean hasEndBoundary = mapperRegionEnd != null && mapperRegionEnd.length 
> 0;
+    boolean hasStartBoundary = mapperRegionStart != null && 
mapperRegionStart.length > 0;
+
+    // Filter chunks that overlap with this mapper region:
+    // - Chunk overlaps if: chunkStart < mapperRegionEnd (when end boundary 
exists)
+    // - Chunk overlaps if: chunkEnd > mapperRegionStart (when start boundary 
exists)
+    if (hasEndBoundary) {
+      queryBuilder.append(" AND START_ROW_KEY <= ?");
+    }
+    if (hasStartBoundary) {
+      queryBuilder.append(" AND END_ROW_KEY >= ?");
+    }
+
+    queryBuilder.append(
+      " ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, 
TENANT_ID, START_ROW_KEY");

Review Comment:
   Same question as above on the need for ORDER BY.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java:
##########
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SHA256DigestUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper that acts as a driver for validating table data between source and 
target clusters. The
+ * actual work of chunking and hashing is done server-side by the coprocessor. 
This mapper fetches
+ * chunk hashes from both clusters, compares them and write to checkpoint 
table.
+ */
+public class PhoenixSyncTableMapper
+  extends Mapper<NullWritable, DBInputFormat.NullDBWritable, NullWritable, 
NullWritable> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableMapper.class);
+
+  public enum SyncCounters {
+    MAPPERS_VERIFIED,
+    MAPPERS_MISMATCHED,
+    CHUNKS_VERIFIED,
+    CHUNKS_MISMATCHED,
+    SOURCE_ROWS_PROCESSED,
+    TARGET_ROWS_PROCESSED
+  }
+
+  private String tableName;
+  private String targetZkQuorum;
+  private Long fromTime;
+  private Long toTime;
+  private String tenantId;
+  private boolean isDryRun;
+  private long chunkSizeBytes;
+  private boolean isRawScan;
+  private boolean isReadAllVersions;
+  private Configuration conf;
+  private Connection sourceConnection;
+  private Connection targetConnection;
+  private Connection globalConnection;
+  private PTable pTable;
+  private byte[] physicalTableName;
+  private byte[] mapperRegionStart;
+  private byte[] mapperRegionEnd;
+  private PhoenixSyncTableOutputRepository syncTableOutputRepository;
+  private Timestamp mapperStartTime;
+
+  @Override
+  protected void setup(Context context) throws InterruptedException {
+    try {
+      super.setup(context);
+      mapperStartTime = new Timestamp(System.currentTimeMillis());
+      this.conf = context.getConfiguration();
+      tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf);
+      targetZkQuorum = 
PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf);
+      fromTime = PhoenixSyncTableTool.getPhoenixSyncTableFromTime(conf);
+      toTime = PhoenixSyncTableTool.getPhoenixSyncTableToTime(conf);
+      tenantId = PhoenixConfigurationUtil.getTenantId(conf);
+      isDryRun = PhoenixSyncTableTool.getPhoenixSyncTableDryRun(conf);
+      chunkSizeBytes = 
PhoenixSyncTableTool.getPhoenixSyncTableChunkSizeBytes(conf);
+      isRawScan = PhoenixSyncTableTool.getPhoenixSyncTableRawScan(conf);
+      isReadAllVersions = 
PhoenixSyncTableTool.getPhoenixSyncTableReadAllVersions(conf);
+      extractRegionBoundariesFromSplit(context);
+      sourceConnection = ConnectionUtil.getInputConnection(conf);
+      pTable = 
sourceConnection.unwrap(PhoenixConnection.class).getTable(tableName);
+      physicalTableName = pTable.getPhysicalName().getBytes();
+      connectToTargetCluster();
+      globalConnection = createGlobalConnection(conf);
+      syncTableOutputRepository = new 
PhoenixSyncTableOutputRepository(globalConnection);
+    } catch (SQLException | IOException e) {
+      tryClosingResources();
+      throw new RuntimeException(
+        String.format("Failed to setup PhoenixSyncTableMapper for table: %s", 
tableName), e);
+    }
+  }
+
+  /**
+   * Extracts mapper region boundaries from the PhoenixInputSplit
+   */
+  private void extractRegionBoundariesFromSplit(Context context) {
+    PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit();
+    KeyRange keyRange = split.getKeyRange();
+    if (keyRange == null) {
+      throw new IllegalStateException(String.format(
+        "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine 
region boundaries for sync operation.",
+        tableName));
+    }
+    mapperRegionStart = keyRange.getLowerRange();
+    mapperRegionEnd = keyRange.getUpperRange();
+  }
+
+  /**
+   * Connects to the target cluster using the target ZK quorum, port, znode, 
krb principal
+   */
+  private void connectToTargetCluster() throws SQLException, IOException {
+    Configuration targetConf = HBaseConfiguration.createClusterConf(conf, 
targetZkQuorum);
+    targetConnection = ConnectionUtil.getInputConnection(targetConf);
+  }
+
+  /**
+   * Creates a global (non-tenant) connection for the checkpoint table.
+   */
+  private Connection createGlobalConnection(Configuration conf) throws 
SQLException {
+    Configuration globalConf = new Configuration(conf);
+    globalConf.unset(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);

Review Comment:
   Isn't tenant ID optional? Perhaps you can use the same connection for both 
if it is not present?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * InputFormat designed for PhoenixSyncTableTool that generates splits based 
on HBase region
+ * boundaries. Filters out already-processed mapper regions using checkpoint 
data, enabling
+ * resumable sync jobs. Uses {@link PhoenixNoOpSingleRecordReader} to invoke 
the mapper once per
+ * split (region).
+ */
+public class PhoenixSyncTableInputFormat extends PhoenixInputFormat {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class);
+
+  public PhoenixSyncTableInputFormat() {
+    super();
+  }
+
+  /**
+   * Returns a {@link PhoenixNoOpSingleRecordReader} that emits exactly one 
dummy record per split.
+   * <p>
+   * PhoenixSyncTableMapper doesn't need actual row data from the RecordReader 
- it extracts region
+   * boundaries from the InputSplit and delegates all scanning to the 
PhoenixSyncTableRegionScanner
+   * coprocessor. Using PhoenixNoOpSingleRecordReader ensures that {@code 
map()} is called exactly
+   * once per region no matter what scan looks like, avoiding the overhead of 
the default
+   * PhoenixRecordReader which would call {@code map()} for every row of scan.
+   * @param split Input Split
+   * @return A PhoenixNoOpSingleRecordReader instance
+   */
+  @SuppressWarnings("rawtypes")
+  @Override
+  public RecordReader createRecordReader(InputSplit split, TaskAttemptContext 
context) {
+    return new PhoenixNoOpSingleRecordReader();
+  }
+
+  /**
+   * Generates InputSplits for the Phoenix sync table job, splits are done 
based on region boundary
+   * and then filter out already-completed regions using sync table checkpoint 
table.
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+    Configuration conf = context.getConfiguration();
+    String tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf);
+    String targetZkQuorum = 
PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf);
+    Long fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf);
+    Long toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf);
+    List<InputSplit> allSplits = super.getSplits(context);
+    if (allSplits == null || allSplits.isEmpty()) {
+      throw new IOException(String.format(
+        "PhoenixInputFormat generated no splits for table %s. Check table 
exists and has regions.",
+        tableName));
+    }
+    LOGGER.info("Total splits generated {} of table {} for PhoenixSyncTable ", 
allSplits.size(),
+      tableName);
+    List<KeyRange> completedRegions;
+    try {
+      completedRegions =
+        queryCompletedMapperRegions(conf, tableName, targetZkQuorum, fromTime, 
toTime);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+    if (completedRegions.isEmpty()) {
+      LOGGER.info("No completed regions for table {} - processing all {} 
splits", tableName,
+        allSplits.size());
+      return allSplits;
+    }
+
+    List<InputSplit> unprocessedSplits = filterCompletedSplits(allSplits, 
completedRegions);
+    LOGGER.info("Found {} completed mapper regions for table {}, {} 
unprocessed splits remaining",
+      completedRegions.size(), tableName, unprocessedSplits.size());
+    return unprocessedSplits;
+  }
+
+  /**
+   * Queries Sync checkpoint table for completed mapper regions
+   */
+  private List<KeyRange> queryCompletedMapperRegions(Configuration conf, 
String tableName,
+    String targetZkQuorum, Long fromTime, Long toTime) throws SQLException {
+    List<KeyRange> completedRegions = new ArrayList<>();
+    try (Connection conn = ConnectionUtil.getInputConnection(conf)) {
+      PhoenixSyncTableOutputRepository repository = new 
PhoenixSyncTableOutputRepository(conn);
+      List<PhoenixSyncTableOutputRow> completedRows =
+        repository.getProcessedMapperRegions(tableName, targetZkQuorum, 
fromTime, toTime);
+      for (PhoenixSyncTableOutputRow row : completedRows) {
+        KeyRange keyRange = KeyRange.getKeyRange(row.getStartRowKey(), 
row.getEndRowKey());
+        completedRegions.add(keyRange);
+      }
+    }
+    return completedRegions;
+  }
+
+  /**
+   * Filters out splits that are fully contained within already completed 
mapper region boundary.
+   * @param allSplits        All splits generated from region boundaries
+   * @param completedRegions Regions already verified (from checkpoint table)
+   * @return Splits that need processing
+   */
+  @VisibleForTesting
+  List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits,
+    List<KeyRange> completedRegions) {
+    allSplits.sort((s1, s2) -> {
+      PhoenixInputSplit ps1 = (PhoenixInputSplit) s1;
+      PhoenixInputSplit ps2 = (PhoenixInputSplit) s2;
+      return KeyRange.COMPARATOR.compare(ps1.getKeyRange(), ps2.getKeyRange());
+    });
+    List<InputSplit> unprocessedSplits = new ArrayList<>();
+    int splitIdx = 0;
+    int completedIdx = 0;
+
+    // Two pointer comparison across splitRange and completedRange
+    while (splitIdx < allSplits.size() && completedIdx < 
completedRegions.size()) {

Review Comment:
   Won't the results be sorted in the PK order already? I see that the new 
commit adds ORDER BY, but not sure why that is required.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * InputFormat designed for PhoenixSyncTableTool that generates splits based 
on HBase region
+ * boundaries. Filters out already-processed mapper regions using checkpoint 
data, enabling
+ * resumable sync jobs. Uses {@link PhoenixNoOpSingleRecordReader} to invoke 
the mapper once per
+ * split (region).
+ */
+public class PhoenixSyncTableInputFormat extends PhoenixInputFormat {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class);
+
+  public PhoenixSyncTableInputFormat() {
+    super();
+  }
+
+  /**
+   * Returns a {@link PhoenixNoOpSingleRecordReader} that emits exactly one 
dummy record per split.
+   * <p>
+   * PhoenixSyncTableMapper doesn't need actual row data from the RecordReader 
- it extracts region
+   * boundaries from the InputSplit and delegates all scanning to the 
PhoenixSyncTableRegionScanner
+   * coprocessor. Using PhoenixNoOpSingleRecordReader ensures that {@code 
map()} is called exactly
+   * once per region no matter what scan looks like, avoiding the overhead of 
the default
+   * PhoenixRecordReader which would call {@code map()} for every row of scan.
+   * @param split Input Split
+   * @return A PhoenixNoOpSingleRecordReader instance
+   */
+  @SuppressWarnings("rawtypes")
+  @Override
+  public RecordReader createRecordReader(InputSplit split, TaskAttemptContext 
context) {
+    return new PhoenixNoOpSingleRecordReader();
+  }
+
+  /**
+   * Generates InputSplits for the Phoenix sync table job, splits are done 
based on region boundary
+   * and then filter out already-completed regions using sync table checkpoint 
table.
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {
+    Configuration conf = context.getConfiguration();
+    String tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf);
+    String targetZkQuorum = 
PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf);
+    Long fromTime = PhoenixSyncTableTool.getPhoenixSyncTableFromTime(conf);
+    Long toTime = PhoenixSyncTableTool.getPhoenixSyncTableToTime(conf);
+    List<InputSplit> allSplits = super.getSplits(context);

Review Comment:
   Why not cast once here and avoid casting at multiple places later?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java:
##########
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixMRJobUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
+import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
+
+/**
+ * A MapReduce tool for verifying and detecting data inconsistencies between 
Phoenix tables across
+ * two HBase clusters (source and target).
+ * <h2>Use Case</h2> This tool is designed for replication/migration 
verification scenarios where
+ * data is replicated from a source Phoenix cluster to a target cluster. It 
efficiently detects
+ * which data chunks are out of sync without transferring all the data over 
the network.
+ * <h2>How It Works</h2>
+ * <ol>
+ * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the 
table into mapper
+ * regions based on HBase region boundaries.</li>
+ * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on 
both source and
+ * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor 
accumulates rows into
+ * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all 
row data (keys +
+ * column families + qualifiers + timestamps + values).</li>
+ * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives 
chunk metadata (start
+ * key, end key, row count, hash) from both clusters and compares the hashes. 
Matching hashes mean
+ * the chunk data is identical; mismatched hashes indicate inconsistency.</li>
+ * <li><b>Result Tracking:</b> Results are check pointed to the {@code 
PHOENIX_SYNC_TABLE_OUTPUT}
+ * table, tracking verified chunks, mismatched chunks, and processing progress 
for resumable
+ * operations.</li>
+ * </ol>
+ * <h2>Usage Example</h2>
+ *
+ * <pre>
+ * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name 
MY_TABLE \
+ * --target-cluster target-zk1,target-zk2:2181:/hbase
+ * </pre>
+ */
+public class PhoenixSyncTableTool extends Configured implements Tool {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableTool.class);
+
+  private static final Option SCHEMA_NAME_OPTION =
+    new Option("s", "schema", true, "Phoenix schema name (optional)");
+  private static final Option TABLE_NAME_OPTION =
+    new Option("tn", "table-name", true, "Table name (mandatory)");
+  private static final Option TARGET_CLUSTER_OPTION =
+    new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum 
(mandatory)");
+  private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", 
true,
+    "Start time in milliseconds for sync (optional, defaults to 0)");
+  private static final Option TO_TIME_OPTION = new Option("tt", "to-time", 
true,
+    "End time in milliseconds for sync (optional, defaults to current time - 1 
hour)");
+  private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", 
false,
+    "Dry run mode - only checkpoint inconsistencies, do not repair 
(optional)");
+  private static final Option CHUNK_SIZE_OPTION =
+    new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, 
defaults to 1GB)");
+  private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", 
"run-foreground", false,
+    "Run the job in foreground. Default - Runs the job in background.");
+  private static final Option TENANT_ID_OPTION =
+    new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific 
table sync (optional)");
+  private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
+
+  private String schemaName;
+  private String tableName;
+  private String targetZkQuorum;
+  private Long startTime;
+  private Long endTime;
+  private boolean isDryRun;
+  private Long chunkSizeBytes;
+  private boolean isForeground;
+  private String tenantId;
+
+  private String qTable;
+  private String qSchemaName;
+
+  private Configuration configuration;
+  private Job job;
+  private PTable pTable;
+
+  /**
+   * Creates an MR job that uses server-side chunking and checksum calculation
+   * @return Configured MapReduce job ready for submission
+   * @throws Exception if job creation fails
+   */
+  private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) 
throws Exception {
+    configureTimeoutsAndRetries(configuration);
+    setPhoenixSyncTableToolConfiguration(configuration);
+    PhoenixMRJobUtil.updateCapacityQueueInfo(configuration);
+    Job job = Job.getInstance(configuration, getJobName());
+    job.setMapperClass(PhoenixSyncTableMapper.class);
+    job.setJarByClass(PhoenixSyncTableTool.class);
+    TableMapReduceUtil.initCredentials(job);
+    TableMapReduceUtil.addDependencyJars(job);
+    configureInput(job, tableType);
+    configureOutput(job);
+    obtainTargetClusterTokens(job);
+    return job;
+  }
+
+  /**
+   * Obtains HBase delegation tokens from the target cluster and adds them to 
the job. This is
+   * required for cross-cluster kerberos authentication.
+   * @param job The MapReduce job to add tokens
+   */
+  private void obtainTargetClusterTokens(Job job) throws IOException {
+    Configuration targetConf =
+      
PhoenixMapReduceUtil.createConfigurationForZkQuorum(job.getConfiguration(), 
targetZkQuorum);
+    TableMapReduceUtil.initCredentialsForCluster(job, targetConf);
+  }
+
+  /**
+   * Configures timeouts and retry settings for the sync job
+   */
+  private void configureTimeoutsAndRetries(Configuration configuration) {
+    long syncTableQueryTimeoutMs =
+      configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT);
+    long syncTableRPCTimeoutMs = 
configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB,
+      QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT);
+    long syncTableClientScannerTimeoutMs =
+      
configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT);
+    int syncTableRpcRetriesCounter =
+      configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER,
+        QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER);
+
+    configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      Long.toString(syncTableClientScannerTimeoutMs));
+    configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, 
Long.toString(syncTableRPCTimeoutMs));
+    configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      Integer.toString(syncTableRpcRetriesCounter));
+    configuration.set(MRJobConfig.TASK_TIMEOUT, 
Long.toString(syncTableQueryTimeoutMs));
+  }
+
+  private void setPhoenixSyncTableToolConfiguration(Configuration 
configuration) {
+    PhoenixConfigurationUtil.setPhoenixSyncTableName(configuration, qTable);
+    PhoenixConfigurationUtil.setPhoenixSyncTableTargetZkQuorum(configuration, 
targetZkQuorum);
+    PhoenixConfigurationUtil.setPhoenixSyncTableFromTime(configuration, 
startTime);
+    PhoenixConfigurationUtil.setPhoenixSyncTableToTime(configuration, endTime);
+    PhoenixConfigurationUtil.setPhoenixSyncTableDryRun(configuration, 
isDryRun);
+    PhoenixConfigurationUtil.setSplitByStats(configuration, false);
+    if (chunkSizeBytes != null) {
+      
PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, 
chunkSizeBytes);
+    }
+    if (tenantId != null) {
+      PhoenixConfigurationUtil.setTenantId(configuration, tenantId);
+    }
+    PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
+    configuration
+      
.setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER,
 true);
+  }
+
+  private void configureInput(Job job, PTableType tableType) {
+    // With below query plan, we get Input split based on region boundary
+    String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ ";
+    String selectStatement = "SELECT " + hint + "1 FROM " + qTable;
+    PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class,
+      PhoenixSyncTableInputFormat.class, qTable, selectStatement);
+  }
+
+  private void configureOutput(Job job) {
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(NullOutputFormat.class);
+  }
+
+  private String getJobName() {
+    StringBuilder jobName = new StringBuilder("PhoenixSyncTableTool");
+    if (qSchemaName != null) {
+      jobName.append("-").append(qSchemaName);
+    }
+    jobName.append("-").append(tableName);
+    jobName.append("-").append(System.currentTimeMillis());
+    return jobName.toString();
+  }
+
+  public CommandLine parseOptions(String[] args) throws IllegalStateException {
+    Options options = getOptions();
+    CommandLineParser parser = 
DefaultParser.builder().setAllowPartialMatching(false)
+      .setStripLeadingAndTrailingQuotes(false).build();
+    CommandLine cmdLine = null;
+    try {
+      cmdLine = parser.parse(options, args);
+    } catch (ParseException e) {
+      LOGGER.error("Failed to parse command line options. Args: {}. Error: {}",
+        Arrays.toString(args), e.getMessage(), e);
+      printHelpAndExit("Error parsing command line options: " + 
e.getMessage(), options);
+    }
+
+    if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+      printHelpAndExit(options, 0);
+    }
+    requireOption(cmdLine, TABLE_NAME_OPTION);
+    requireOption(cmdLine, TARGET_CLUSTER_OPTION);
+    return cmdLine;
+  }
+
+  private void requireOption(CommandLine cmdLine, Option option) {
+    if (!cmdLine.hasOption(option.getOpt())) {
+      throw new IllegalStateException(option.getLongOpt() + " is a mandatory 
parameter");
+    }
+  }
+
+  private Options getOptions() {
+    Options options = new Options();
+    options.addOption(SCHEMA_NAME_OPTION);
+    options.addOption(TABLE_NAME_OPTION);
+    options.addOption(TARGET_CLUSTER_OPTION);
+    options.addOption(FROM_TIME_OPTION);
+    options.addOption(TO_TIME_OPTION);
+    options.addOption(DRY_RUN_OPTION);
+    options.addOption(CHUNK_SIZE_OPTION);
+    options.addOption(RUN_FOREGROUND_OPTION);
+    options.addOption(TENANT_ID_OPTION);
+    options.addOption(HELP_OPTION);
+    return options;
+  }
+
+  private void printHelpAndExit(String errorMessage, Options options) {
+    System.err.println(errorMessage);
+    printHelpAndExit(options, -1);
+  }
+
+  private void printHelpAndExit(Options options, int exitCode) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp("hadoop jar phoenix-server.jar " + 
PhoenixSyncTableTool.class.getName(),
+      "Synchronize a Phoenix table between source and target clusters", 
options,
+      "\nExample usage:\n"
+        + "hadoop jar phoenix-server.jar 
org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n"
+        + "  --table-name MY_TABLE \\\n" + "  --target-cluster 
<zk_quorum>:2181 \\\n"
+        + "  --dry-run\n",
+      true);
+    System.exit(exitCode);
+  }
+
+  public void populateSyncTableToolAttributes(CommandLine cmdLine) {
+    tableName = cmdLine.getOptionValue(TABLE_NAME_OPTION.getOpt());
+    targetZkQuorum = cmdLine.getOptionValue(TARGET_CLUSTER_OPTION.getOpt());
+    schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+
+    if (cmdLine.hasOption(FROM_TIME_OPTION.getOpt())) {
+      startTime = 
Long.valueOf(cmdLine.getOptionValue(FROM_TIME_OPTION.getOpt()));
+    } else {
+      startTime = 0L;
+    }
+
+    if (cmdLine.hasOption(TO_TIME_OPTION.getOpt())) {
+      endTime = Long.valueOf(cmdLine.getOptionValue(TO_TIME_OPTION.getOpt()));
+    } else {
+      // Default endTime, current time - 1 hour
+      endTime = EnvironmentEdgeManager.currentTimeMillis() - (60 * 60 * 1000);
+    }
+
+    if (cmdLine.hasOption(CHUNK_SIZE_OPTION.getOpt())) {
+      chunkSizeBytes = 
Long.valueOf(cmdLine.getOptionValue(CHUNK_SIZE_OPTION.getOpt()));
+      if (chunkSizeBytes <= 0) {
+        throw new IllegalArgumentException(
+          "Chunk size must be a positive value, got: " + chunkSizeBytes);
+      }
+    }
+    if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) {
+      tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
+    }
+    isDryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt());
+    isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+    qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
+    qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
+    PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
+    PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, 
qTable);

Review Comment:
   I just saw your other comments on setRaw(). I agree, if we don't do raw 
scan, then we don't have to worry about these aspects, but if the HA guarantee 
includes the history (e.g., CDC), then we do we have a choice?



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, 
EXECUTION_START_TIME, EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    TENANT_ID VARCHAR,\n"
+      + "    START_ROW_KEY VARBINARY_ENCODED,\n" + "    END_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    IS_DRY_RUN BOOLEAN, \n" + "    EXECUTION_START_TIME TIMESTAMP,\n"
+      + "    EXECUTION_END_TIME TIMESTAMP,\n" + "    STATUS VARCHAR(20),\n"
+      + "    COUNTERS VARCHAR, \n" + "    CONSTRAINT PK PRIMARY KEY (\n" + "   
     TABLE_NAME,\n"
+      + "        TARGET_CLUSTER,\n" + "        TYPE ,\n" + "        
FROM_TIME,\n"
+      + "        TO_TIME,\n" + "        TENANT_ID,\n" + "        START_ROW_KEY 
)" + ") TTL="
+      + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow 
row)
+    throws SQLException {
+
+    // Validate required parameters
+    if (row.getTableName() == null || row.getTableName().isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (row.getType() == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (row.getFromTime() == null || row.getToTime() == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }

Review Comment:
   Edited the comment to add the missing `Preconditions`.



##########
phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Status;
+import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table 
stores checkpoint
+ * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level 
checkpointing (skip completed
+ * mapper regions on restart) 2. Chunk level checkpointing (skip completed 
chunks)
+ */
+public class PhoenixSyncTableOutputRepository {
+
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class);
+  public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = 
"PHOENIX_SYNC_TABLE_CHECKPOINT";
+  private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 
days
+  private final Connection connection;
+  private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO "
+    + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, 
FROM_TIME, TO_TIME,"
+    + " START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, 
EXECUTION_END_TIME,"
+    + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+
+  /**
+   * Creates a repository for managing sync table checkpoint operations. Note: 
The connection is
+   * stored as-is and shared across operations. The caller retains ownership 
and is responsible for
+   * connection lifecycle.
+   * @param connection Phoenix connection (must remain open for repository 
lifetime)
+   */
+  public PhoenixSyncTableOutputRepository(Connection connection) {
+    this.connection = connection;
+  }
+
+  public void createSyncCheckpointTableIfNotExists() throws SQLException {
+    String ddl = "CREATE TABLE IF NOT EXISTS " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n"
+      + "    TABLE_NAME VARCHAR NOT NULL,\n" + "    TARGET_CLUSTER VARCHAR NOT 
NULL,\n"
+      + "    TYPE VARCHAR(20) NOT NULL,\n" + "    FROM_TIME BIGINT NOT NULL,\n"
+      + "    TO_TIME BIGINT NOT NULL,\n" + "    START_ROW_KEY 
VARBINARY_ENCODED,\n"
+      + "    END_ROW_KEY VARBINARY_ENCODED,\n" + "    IS_DRY_RUN BOOLEAN, \n"
+      + "    EXECUTION_START_TIME TIMESTAMP,\n" + "    EXECUTION_END_TIME 
TIMESTAMP,\n"
+      + "    STATUS VARCHAR(20),\n" + "    COUNTERS VARCHAR(255), \n"
+      + "    CONSTRAINT PK PRIMARY KEY (\n" + "        TABLE_NAME,\n" + "      
  TARGET_CLUSTER,\n"
+      + "        TYPE ,\n" + "        FROM_TIME,\n" + "        TO_TIME,\n"
+      + "        START_ROW_KEY )" + ") TTL=" + OUTPUT_TABLE_TTL_SECONDS;
+
+    try (Statement stmt = connection.createStatement()) {
+      stmt.execute(ddl);
+      connection.commit();
+      LOGGER.info("Successfully created or verified existence of {} table",
+        SYNC_TABLE_CHECKPOINT_TABLE_NAME);
+    }
+  }
+
+  public void checkpointSyncTableResult(String tableName, String 
targetCluster, Type type,
+    Long fromTime, Long toTime, boolean isDryRun, byte[] startKey, byte[] 
endKey, Status status,
+    Timestamp executionStartTime, Timestamp executionEndTime, String counters) 
throws SQLException {
+
+    // Validate required parameters
+    if (tableName == null || tableName.isEmpty()) {
+      throw new IllegalArgumentException("TableName cannot be null or empty 
for checkpoint");
+    }
+    if (targetCluster == null || targetCluster.isEmpty()) {
+      throw new IllegalArgumentException("TargetCluster cannot be null or 
empty for checkpoint");
+    }
+    if (type == null) {
+      throw new IllegalArgumentException("Type cannot be null for checkpoint");
+    }
+    if (fromTime == null || toTime == null) {
+      throw new IllegalArgumentException("FromTime and ToTime cannot be null 
for checkpoint");
+    }
+
+    try (PreparedStatement ps = 
connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) {
+      ps.setString(1, tableName);
+      ps.setString(2, targetCluster);
+      ps.setString(3, type.name());
+      ps.setLong(4, fromTime);
+      ps.setLong(5, toTime);
+      ps.setBytes(6, startKey);
+      ps.setBytes(7, endKey);
+      ps.setBoolean(8, isDryRun);
+      ps.setTimestamp(9, executionStartTime);
+      ps.setTimestamp(10, executionEndTime);
+      ps.setString(11, status != null ? status.name() : null);
+      ps.setString(12, counters);
+      ps.executeUpdate();
+      connection.commit();
+    }
+  }
+
+  /**
+   * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat 
to filter out
+   * already-processed regions.
+   * @param tableName     Source table name
+   * @param targetCluster Target cluster ZK quorum
+   * @param fromTime      Start timestamp (nullable)
+   * @param toTime        End timestamp (nullable)
+   * @return List of completed mapper regions
+   */
+  public List<PhoenixSyncTableOutputRow> getProcessedMapperRegions(String 
tableName,
+    String targetCluster, Long fromTime, Long toTime) throws SQLException {
+
+    String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME
+      + " WHERE TABLE_NAME = ?  AND TARGET_CLUSTER = ?"
+      + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, 
?)";
+    List<PhoenixSyncTableOutputRow> results = new ArrayList<>();
+    try (PreparedStatement ps = connection.prepareStatement(query)) {
+      int paramIndex = 1;
+      ps.setString(paramIndex++, tableName);
+      ps.setString(paramIndex++, targetCluster);
+      ps.setString(paramIndex++, Type.MAPPER_REGION.name());
+      ps.setLong(paramIndex++, fromTime);
+      ps.setLong(paramIndex++, toTime);
+      ps.setString(paramIndex++, Status.VERIFIED.name());
+      ps.setString(paramIndex, Status.MISMATCHED.name());
+      try (ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+          PhoenixSyncTableOutputRow row =
+            new 
PhoenixSyncTableOutputRow.Builder().setStartRowKey(rs.getBytes("START_ROW_KEY"))
+              .setEndRowKey(rs.getBytes("END_ROW_KEY")).build();
+          results.add(row);
+        }
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip 
already-processed chunks.
+   * @param tableName         Source table name
+   * @param targetCluster     Target cluster ZK quorum
+   * @param fromTime          Start timestamp (nullable)
+   * @param toTime            End timestamp (nullable)
+   * @param mapperRegionStart Mapper region start key
+   * @param mapperRegionEnd   Mapper region end key
+   * @return List of processed chunks in the region
+   */
+  public List<PhoenixSyncTableOutputRow> getProcessedChunks(String tableName, 
String targetCluster,
+    Long fromTime, Long toTime, byte[] mapperRegionStart, byte[] 
mapperRegionEnd)
+    throws SQLException {
+    StringBuilder queryBuilder = new StringBuilder();
+    queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM " + 
SYNC_TABLE_CHECKPOINT_TABLE_NAME
+      + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? "
+      + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?");
+
+    // Check if mapper region boundaries are non-empty (i.e., NOT first/last 
regions)
+    // Only add boundary conditions for non-empty boundaries
+    boolean hasEndBoundary = mapperRegionEnd != null && mapperRegionEnd.length 
> 0;
+    boolean hasStartBoundary = mapperRegionStart != null && 
mapperRegionStart.length > 0;
+
+    // Filter chunks that overlap with this mapper region:
+    // - Chunk overlaps if: chunkStart < mapperRegionEnd (when end boundary 
exists)
+    // - Chunk overlaps if: chunkEnd > mapperRegionStart (when start boundary 
exists)
+    if (hasEndBoundary) {
+      queryBuilder.append(" AND START_ROW_KEY <= ?");
+    }
+    if (hasStartBoundary) {
+      queryBuilder.append(" AND END_ROW_KEY >= ?");

Review Comment:
   Perhaps we should add " AND START_ROW_KEY > <0x00>"? You may want to check 
the query plan for with and without this constraint to see if it is helping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to