haridsv commented on code in PR #2379: URL: https://github.com/apache/phoenix/pull/2379#discussion_r3004205492
########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableCheckpointOutputRow.java: ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Objects; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Data model class representing required row in the PHOENIX_SYNC_TABLE_CHECKPOINT table + */ +public class PhoenixSyncTableCheckpointOutputRow { + + public enum Type { + CHUNK, + REGION + } + + public enum Status { + VERIFIED, + MISMATCHED + } + + private String tableName; + private String targetCluster; + private Type type; + private Long fromTime; + private Long toTime; + private String tenantId; + private Boolean isDryRun; + private byte[] startRowKey; + private byte[] endRowKey; + private Timestamp executionStartTime; + private Timestamp executionEndTime; + private Status status; + private String counters; + + @Override + public String toString() { + return String.format("SyncOutputRow[table=%s, target=%s, type=%s, start=%s, end=%s, status=%s]", + tableName, targetCluster, type, Bytes.toStringBinary(startRowKey), + Bytes.toStringBinary(endRowKey), status); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PhoenixSyncTableCheckpointOutputRow that = (PhoenixSyncTableCheckpointOutputRow) o; + return Objects.equals(tableName, that.tableName) + && Objects.equals(targetCluster, that.targetCluster) && type == that.type + && Objects.equals(fromTime, that.fromTime) && Objects.equals(toTime, that.toTime) + && Objects.equals(tenantId, that.tenantId) && Objects.equals(isDryRun, that.isDryRun) + && Arrays.equals(startRowKey, that.startRowKey) && Arrays.equals(endRowKey, that.endRowKey) + && Objects.equals(executionStartTime, that.executionStartTime) + && Objects.equals(executionEndTime, that.executionEndTime) && status == that.status + && Objects.equals(counters, that.counters); + } + + @Override + public int hashCode() { + int result = Objects.hash(tableName, targetCluster, type, fromTime, toTime, tenantId, isDryRun, + executionStartTime, executionEndTime, status, counters); + result = 31 * result + Arrays.hashCode(startRowKey); + result = 31 * result + Arrays.hashCode(endRowKey); + return result; + } + + public String getTableName() { + return tableName; + } + + public String getTargetCluster() { + return targetCluster; + } + + public Type getType() { + return type; + } + + public Long getFromTime() { + return fromTime; + } + + public Long getToTime() { + return toTime; + } + + public String getTenantId() { + return tenantId; + } + + public Boolean getIsDryRun() { + return isDryRun; + } + + public byte[] getStartRowKey() { + return startRowKey != null ? Arrays.copyOf(startRowKey, startRowKey.length) : null; + } + + public byte[] getEndRowKey() { + return endRowKey != null ? Arrays.copyOf(endRowKey, endRowKey.length) : null; + } + + public Timestamp getExecutionStartTime() { + return executionStartTime; + } + + public Timestamp getExecutionEndTime() { + return executionEndTime; + } + + public Status getStatus() { + return status; + } + + public String getCounters() { + return counters; + } + + @VisibleForTesting + public long getSourceRowsProcessed() { + return parseCounterValue(PhoenixSyncTableMapper.SyncCounters.SOURCE_ROWS_PROCESSED.name()); + } + + @VisibleForTesting + public long getTargetRowsProcessed() { + return parseCounterValue(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED.name()); + } + + private long parseCounterValue(String counterName) { + if (counters == null || counters.isEmpty()) { + return 0; + } + + String[] pairs = counters.split(","); + for (String pair : pairs) { + String[] keyValue = pair.split("="); + if (keyValue.length == 2 && keyValue[0].trim().equals(counterName)) { + return Long.parseLong(keyValue[1].trim()); + } + } + return 0; + } Review Comment: The counters string is generated outside the class, while parsing is done inside, this mismatch in abstraction is not good. I suggest you either encapsulate both in this class or move both into one util class. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow row) + throws SQLException { + + // Validate required parameters + if (row.getTableName() == null || row.getTableName().isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (row.getType() == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (row.getFromTime() == null || row.getToTime() == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } + + try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { + ps.setString(1, row.getTableName()); + ps.setString(2, row.getTargetCluster()); + ps.setString(3, row.getType().name()); + ps.setLong(4, row.getFromTime()); + ps.setLong(5, row.getToTime()); + ps.setString(6, row.getTenantId()); + ps.setBytes(7, row.getStartRowKey()); + ps.setBytes(8, row.getEndRowKey()); + ps.setBoolean(9, row.getIsDryRun()); + ps.setTimestamp(10, row.getExecutionStartTime()); + ps.setTimestamp(11, row.getExecutionEndTime()); + ps.setString(12, row.getStatus() != null ? row.getStatus().name() : null); + ps.setString(13, row.getCounters()); + ps.executeUpdate(); + connection.commit(); + } + } + + /** + * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat to filter out + * already-processed regions. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp + * @param toTime End timestamp + * @param tenantId Tenant ID + * @return List of completed mapper regions + */ + public List<PhoenixSyncTableCheckpointOutputRow> getProcessedMapperRegions(String tableName, + String targetCluster, Long fromTime, Long toTime, String tenantId) throws SQLException { + + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM ") + .append(SYNC_TABLE_CHECKPOINT_TABLE_NAME) + .append(" WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?") + .append(" AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? "); + + // Conditionally build TENANT_ID clause based on whether tenantId is null + if (tenantId == null) { + queryBuilder.append(" AND TENANT_ID IS NULL"); + } else { + queryBuilder.append(" AND TENANT_ID = ?"); + } + + queryBuilder.append( + " ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, TENANT_ID, START_ROW_KEY"); Review Comment: This is already the PK order, correct? Why do we need an explicit ORDER BY? ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow row) + throws SQLException { + + // Validate required parameters + if (row.getTableName() == null || row.getTableName().isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (row.getType() == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (row.getFromTime() == null || row.getToTime() == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } + + try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { + ps.setString(1, row.getTableName()); + ps.setString(2, row.getTargetCluster()); + ps.setString(3, row.getType().name()); + ps.setLong(4, row.getFromTime()); + ps.setLong(5, row.getToTime()); + ps.setString(6, row.getTenantId()); + ps.setBytes(7, row.getStartRowKey()); + ps.setBytes(8, row.getEndRowKey()); + ps.setBoolean(9, row.getIsDryRun()); + ps.setTimestamp(10, row.getExecutionStartTime()); + ps.setTimestamp(11, row.getExecutionEndTime()); + ps.setString(12, row.getStatus() != null ? row.getStatus().name() : null); + ps.setString(13, row.getCounters()); + ps.executeUpdate(); + connection.commit(); + } + } + + /** + * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat to filter out + * already-processed regions. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp + * @param toTime End timestamp + * @param tenantId Tenant ID + * @return List of completed mapper regions + */ + public List<PhoenixSyncTableCheckpointOutputRow> getProcessedMapperRegions(String tableName, + String targetCluster, Long fromTime, Long toTime, String tenantId) throws SQLException { + + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM ") + .append(SYNC_TABLE_CHECKPOINT_TABLE_NAME) + .append(" WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?") + .append(" AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? "); + + // Conditionally build TENANT_ID clause based on whether tenantId is null + if (tenantId == null) { + queryBuilder.append(" AND TENANT_ID IS NULL"); + } else { + queryBuilder.append(" AND TENANT_ID = ?"); + } + + queryBuilder.append( + " ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, TENANT_ID, START_ROW_KEY"); + + List<PhoenixSyncTableCheckpointOutputRow> results = new ArrayList<>(); + try (PreparedStatement ps = connection.prepareStatement(queryBuilder.toString())) { + int paramIndex = 1; + ps.setString(paramIndex++, tableName); + ps.setString(paramIndex++, targetCluster); + ps.setString(paramIndex++, Type.REGION.name()); + ps.setLong(paramIndex++, fromTime); + ps.setLong(paramIndex++, toTime); + // Only bind tenantId parameter if non-null + if (tenantId != null) { + ps.setString(paramIndex, tenantId); + } + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + PhoenixSyncTableCheckpointOutputRow row = + new PhoenixSyncTableCheckpointOutputRow.Builder() + .setStartRowKey(rs.getBytes("START_ROW_KEY")).setEndRowKey(rs.getBytes("END_ROW_KEY")) + .build(); + results.add(row); + } + } + } + return results; + } + + /** + * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip already-processed chunks. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp + * @param toTime End timestamp + * @param tenantId Tenant ID + * @param mapperRegionStart Mapper region start key + * @param mapperRegionEnd Mapper region end key + * @return List of processed chunks in the region + */ + public List<PhoenixSyncTableCheckpointOutputRow> getProcessedChunks(String tableName, + String targetCluster, Long fromTime, Long toTime, String tenantId, byte[] mapperRegionStart, + byte[] mapperRegionEnd) throws SQLException { + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? " + + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?"); + + // Conditionally build TENANT_ID clause based on whether tenantId is null + if (tenantId == null) { + queryBuilder.append(" AND TENANT_ID IS NULL"); + } else { + queryBuilder.append(" AND TENANT_ID = ?"); + } + + // Check if mapper region boundaries are non-empty (i.e., NOT first/last regions) + // Only add boundary conditions for non-empty boundaries + boolean hasEndBoundary = mapperRegionEnd != null && mapperRegionEnd.length > 0; + boolean hasStartBoundary = mapperRegionStart != null && mapperRegionStart.length > 0; + + // Filter chunks that overlap with this mapper region: + // - Chunk overlaps if: chunkStart < mapperRegionEnd (when end boundary exists) + // - Chunk overlaps if: chunkEnd > mapperRegionStart (when start boundary exists) + if (hasEndBoundary) { + queryBuilder.append(" AND START_ROW_KEY <= ?"); + } + if (hasStartBoundary) { + queryBuilder.append(" AND END_ROW_KEY >= ?"); + } + + queryBuilder.append( + " ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, TENANT_ID, START_ROW_KEY"); Review Comment: Same question as above on the need for ORDER BY. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java: ########## @@ -0,0 +1,741 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; + +import java.io.IOException; +import java.security.MessageDigest; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SHA256DigestUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mapper that acts as a driver for validating table data between source and target clusters. The + * actual work of chunking and hashing is done server-side by the coprocessor. This mapper fetches + * chunk hashes from both clusters, compares them and write to checkpoint table. + */ +public class PhoenixSyncTableMapper + extends Mapper<NullWritable, DBInputFormat.NullDBWritable, NullWritable, NullWritable> { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableMapper.class); + + public enum SyncCounters { + MAPPERS_VERIFIED, + MAPPERS_MISMATCHED, + CHUNKS_VERIFIED, + CHUNKS_MISMATCHED, + SOURCE_ROWS_PROCESSED, + TARGET_ROWS_PROCESSED + } + + private String tableName; + private String targetZkQuorum; + private Long fromTime; + private Long toTime; + private String tenantId; + private boolean isDryRun; + private long chunkSizeBytes; + private boolean isRawScan; + private boolean isReadAllVersions; + private Configuration conf; + private Connection sourceConnection; + private Connection targetConnection; + private Connection globalConnection; + private PTable pTable; + private byte[] physicalTableName; + private byte[] mapperRegionStart; + private byte[] mapperRegionEnd; + private PhoenixSyncTableOutputRepository syncTableOutputRepository; + private Timestamp mapperStartTime; + + @Override + protected void setup(Context context) throws InterruptedException { + try { + super.setup(context); + mapperStartTime = new Timestamp(System.currentTimeMillis()); + this.conf = context.getConfiguration(); + tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf); + targetZkQuorum = PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf); + fromTime = PhoenixSyncTableTool.getPhoenixSyncTableFromTime(conf); + toTime = PhoenixSyncTableTool.getPhoenixSyncTableToTime(conf); + tenantId = PhoenixConfigurationUtil.getTenantId(conf); + isDryRun = PhoenixSyncTableTool.getPhoenixSyncTableDryRun(conf); + chunkSizeBytes = PhoenixSyncTableTool.getPhoenixSyncTableChunkSizeBytes(conf); + isRawScan = PhoenixSyncTableTool.getPhoenixSyncTableRawScan(conf); + isReadAllVersions = PhoenixSyncTableTool.getPhoenixSyncTableReadAllVersions(conf); + extractRegionBoundariesFromSplit(context); + sourceConnection = ConnectionUtil.getInputConnection(conf); + pTable = sourceConnection.unwrap(PhoenixConnection.class).getTable(tableName); + physicalTableName = pTable.getPhysicalName().getBytes(); + connectToTargetCluster(); + globalConnection = createGlobalConnection(conf); + syncTableOutputRepository = new PhoenixSyncTableOutputRepository(globalConnection); + } catch (SQLException | IOException e) { + tryClosingResources(); + throw new RuntimeException( + String.format("Failed to setup PhoenixSyncTableMapper for table: %s", tableName), e); + } + } + + /** + * Extracts mapper region boundaries from the PhoenixInputSplit + */ + private void extractRegionBoundariesFromSplit(Context context) { + PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit(); + KeyRange keyRange = split.getKeyRange(); + if (keyRange == null) { + throw new IllegalStateException(String.format( + "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine region boundaries for sync operation.", + tableName)); + } + mapperRegionStart = keyRange.getLowerRange(); + mapperRegionEnd = keyRange.getUpperRange(); + } + + /** + * Connects to the target cluster using the target ZK quorum, port, znode, krb principal + */ + private void connectToTargetCluster() throws SQLException, IOException { + Configuration targetConf = HBaseConfiguration.createClusterConf(conf, targetZkQuorum); + targetConnection = ConnectionUtil.getInputConnection(targetConf); + } + + /** + * Creates a global (non-tenant) connection for the checkpoint table. + */ + private Connection createGlobalConnection(Configuration conf) throws SQLException { + Configuration globalConf = new Configuration(conf); + globalConf.unset(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); Review Comment: Isn't tenant ID optional? Perhaps you can use the same connection for both if it is not present? ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.KeyRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * InputFormat designed for PhoenixSyncTableTool that generates splits based on HBase region + * boundaries. Filters out already-processed mapper regions using checkpoint data, enabling + * resumable sync jobs. Uses {@link PhoenixNoOpSingleRecordReader} to invoke the mapper once per + * split (region). + */ +public class PhoenixSyncTableInputFormat extends PhoenixInputFormat { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class); + + public PhoenixSyncTableInputFormat() { + super(); + } + + /** + * Returns a {@link PhoenixNoOpSingleRecordReader} that emits exactly one dummy record per split. + * <p> + * PhoenixSyncTableMapper doesn't need actual row data from the RecordReader - it extracts region + * boundaries from the InputSplit and delegates all scanning to the PhoenixSyncTableRegionScanner + * coprocessor. Using PhoenixNoOpSingleRecordReader ensures that {@code map()} is called exactly + * once per region no matter what scan looks like, avoiding the overhead of the default + * PhoenixRecordReader which would call {@code map()} for every row of scan. + * @param split Input Split + * @return A PhoenixNoOpSingleRecordReader instance + */ + @SuppressWarnings("rawtypes") + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { + return new PhoenixNoOpSingleRecordReader(); + } + + /** + * Generates InputSplits for the Phoenix sync table job, splits are done based on region boundary + * and then filter out already-completed regions using sync table checkpoint table. + */ + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + String tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf); + String targetZkQuorum = PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf); + Long fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf); + Long toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf); + List<InputSplit> allSplits = super.getSplits(context); + if (allSplits == null || allSplits.isEmpty()) { + throw new IOException(String.format( + "PhoenixInputFormat generated no splits for table %s. Check table exists and has regions.", + tableName)); + } + LOGGER.info("Total splits generated {} of table {} for PhoenixSyncTable ", allSplits.size(), + tableName); + List<KeyRange> completedRegions; + try { + completedRegions = + queryCompletedMapperRegions(conf, tableName, targetZkQuorum, fromTime, toTime); + } catch (SQLException e) { + throw new RuntimeException(e); + } + if (completedRegions.isEmpty()) { + LOGGER.info("No completed regions for table {} - processing all {} splits", tableName, + allSplits.size()); + return allSplits; + } + + List<InputSplit> unprocessedSplits = filterCompletedSplits(allSplits, completedRegions); + LOGGER.info("Found {} completed mapper regions for table {}, {} unprocessed splits remaining", + completedRegions.size(), tableName, unprocessedSplits.size()); + return unprocessedSplits; + } + + /** + * Queries Sync checkpoint table for completed mapper regions + */ + private List<KeyRange> queryCompletedMapperRegions(Configuration conf, String tableName, + String targetZkQuorum, Long fromTime, Long toTime) throws SQLException { + List<KeyRange> completedRegions = new ArrayList<>(); + try (Connection conn = ConnectionUtil.getInputConnection(conf)) { + PhoenixSyncTableOutputRepository repository = new PhoenixSyncTableOutputRepository(conn); + List<PhoenixSyncTableOutputRow> completedRows = + repository.getProcessedMapperRegions(tableName, targetZkQuorum, fromTime, toTime); + for (PhoenixSyncTableOutputRow row : completedRows) { + KeyRange keyRange = KeyRange.getKeyRange(row.getStartRowKey(), row.getEndRowKey()); + completedRegions.add(keyRange); + } + } + return completedRegions; + } + + /** + * Filters out splits that are fully contained within already completed mapper region boundary. + * @param allSplits All splits generated from region boundaries + * @param completedRegions Regions already verified (from checkpoint table) + * @return Splits that need processing + */ + @VisibleForTesting + List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits, + List<KeyRange> completedRegions) { + allSplits.sort((s1, s2) -> { + PhoenixInputSplit ps1 = (PhoenixInputSplit) s1; + PhoenixInputSplit ps2 = (PhoenixInputSplit) s2; + return KeyRange.COMPARATOR.compare(ps1.getKeyRange(), ps2.getKeyRange()); + }); + List<InputSplit> unprocessedSplits = new ArrayList<>(); + int splitIdx = 0; + int completedIdx = 0; + + // Two pointer comparison across splitRange and completedRange + while (splitIdx < allSplits.size() && completedIdx < completedRegions.size()) { Review Comment: Won't the results be sorted in the PK order already? I see that the new commit adds ORDER BY, but not sure why that is required. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.KeyRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * InputFormat designed for PhoenixSyncTableTool that generates splits based on HBase region + * boundaries. Filters out already-processed mapper regions using checkpoint data, enabling + * resumable sync jobs. Uses {@link PhoenixNoOpSingleRecordReader} to invoke the mapper once per + * split (region). + */ +public class PhoenixSyncTableInputFormat extends PhoenixInputFormat { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class); + + public PhoenixSyncTableInputFormat() { + super(); + } + + /** + * Returns a {@link PhoenixNoOpSingleRecordReader} that emits exactly one dummy record per split. + * <p> + * PhoenixSyncTableMapper doesn't need actual row data from the RecordReader - it extracts region + * boundaries from the InputSplit and delegates all scanning to the PhoenixSyncTableRegionScanner + * coprocessor. Using PhoenixNoOpSingleRecordReader ensures that {@code map()} is called exactly + * once per region no matter what scan looks like, avoiding the overhead of the default + * PhoenixRecordReader which would call {@code map()} for every row of scan. + * @param split Input Split + * @return A PhoenixNoOpSingleRecordReader instance + */ + @SuppressWarnings("rawtypes") + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { + return new PhoenixNoOpSingleRecordReader(); + } + + /** + * Generates InputSplits for the Phoenix sync table job, splits are done based on region boundary + * and then filter out already-completed regions using sync table checkpoint table. + */ + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + String tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf); + String targetZkQuorum = PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf); + Long fromTime = PhoenixSyncTableTool.getPhoenixSyncTableFromTime(conf); + Long toTime = PhoenixSyncTableTool.getPhoenixSyncTableToTime(conf); + List<InputSplit> allSplits = super.getSplits(context); Review Comment: Why not cast once here and avoid casting at multiple places later? ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java: ########## @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Properties; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixMRJobUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException; + +/** + * A MapReduce tool for verifying and detecting data inconsistencies between Phoenix tables across + * two HBase clusters (source and target). + * <h2>Use Case</h2> This tool is designed for replication/migration verification scenarios where + * data is replicated from a source Phoenix cluster to a target cluster. It efficiently detects + * which data chunks are out of sync without transferring all the data over the network. + * <h2>How It Works</h2> + * <ol> + * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the table into mapper + * regions based on HBase region boundaries.</li> + * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on both source and + * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor accumulates rows into + * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all row data (keys + + * column families + qualifiers + timestamps + values).</li> + * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives chunk metadata (start + * key, end key, row count, hash) from both clusters and compares the hashes. Matching hashes mean + * the chunk data is identical; mismatched hashes indicate inconsistency.</li> + * <li><b>Result Tracking:</b> Results are check pointed to the {@code PHOENIX_SYNC_TABLE_OUTPUT} + * table, tracking verified chunks, mismatched chunks, and processing progress for resumable + * operations.</li> + * </ol> + * <h2>Usage Example</h2> + * + * <pre> + * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name MY_TABLE \ + * --target-cluster target-zk1,target-zk2:2181:/hbase + * </pre> + */ +public class PhoenixSyncTableTool extends Configured implements Tool { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableTool.class); + + private static final Option SCHEMA_NAME_OPTION = + new Option("s", "schema", true, "Phoenix schema name (optional)"); + private static final Option TABLE_NAME_OPTION = + new Option("tn", "table-name", true, "Table name (mandatory)"); + private static final Option TARGET_CLUSTER_OPTION = + new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum (mandatory)"); + private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", true, + "Start time in milliseconds for sync (optional, defaults to 0)"); + private static final Option TO_TIME_OPTION = new Option("tt", "to-time", true, + "End time in milliseconds for sync (optional, defaults to current time - 1 hour)"); + private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", false, + "Dry run mode - only checkpoint inconsistencies, do not repair (optional)"); + private static final Option CHUNK_SIZE_OPTION = + new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, defaults to 1GB)"); + private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", "run-foreground", false, + "Run the job in foreground. Default - Runs the job in background."); + private static final Option TENANT_ID_OPTION = + new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific table sync (optional)"); + private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); + + private String schemaName; + private String tableName; + private String targetZkQuorum; + private Long startTime; + private Long endTime; + private boolean isDryRun; + private Long chunkSizeBytes; + private boolean isForeground; + private String tenantId; + + private String qTable; + private String qSchemaName; + + private Configuration configuration; + private Job job; + private PTable pTable; + + /** + * Creates an MR job that uses server-side chunking and checksum calculation + * @return Configured MapReduce job ready for submission + * @throws Exception if job creation fails + */ + private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) throws Exception { + configureTimeoutsAndRetries(configuration); + setPhoenixSyncTableToolConfiguration(configuration); + PhoenixMRJobUtil.updateCapacityQueueInfo(configuration); + Job job = Job.getInstance(configuration, getJobName()); + job.setMapperClass(PhoenixSyncTableMapper.class); + job.setJarByClass(PhoenixSyncTableTool.class); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + configureInput(job, tableType); + configureOutput(job); + obtainTargetClusterTokens(job); + return job; + } + + /** + * Obtains HBase delegation tokens from the target cluster and adds them to the job. This is + * required for cross-cluster kerberos authentication. + * @param job The MapReduce job to add tokens + */ + private void obtainTargetClusterTokens(Job job) throws IOException { + Configuration targetConf = + PhoenixMapReduceUtil.createConfigurationForZkQuorum(job.getConfiguration(), targetZkQuorum); + TableMapReduceUtil.initCredentialsForCluster(job, targetConf); + } + + /** + * Configures timeouts and retry settings for the sync job + */ + private void configureTimeoutsAndRetries(Configuration configuration) { + long syncTableQueryTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT); + long syncTableRPCTimeoutMs = configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT); + long syncTableClientScannerTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT); + int syncTableRpcRetriesCounter = + configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER); + + configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + Long.toString(syncTableClientScannerTimeoutMs)); + configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Long.toString(syncTableRPCTimeoutMs)); + configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + Integer.toString(syncTableRpcRetriesCounter)); + configuration.set(MRJobConfig.TASK_TIMEOUT, Long.toString(syncTableQueryTimeoutMs)); + } + + private void setPhoenixSyncTableToolConfiguration(Configuration configuration) { + PhoenixConfigurationUtil.setPhoenixSyncTableName(configuration, qTable); + PhoenixConfigurationUtil.setPhoenixSyncTableTargetZkQuorum(configuration, targetZkQuorum); + PhoenixConfigurationUtil.setPhoenixSyncTableFromTime(configuration, startTime); + PhoenixConfigurationUtil.setPhoenixSyncTableToTime(configuration, endTime); + PhoenixConfigurationUtil.setPhoenixSyncTableDryRun(configuration, isDryRun); + PhoenixConfigurationUtil.setSplitByStats(configuration, false); + if (chunkSizeBytes != null) { + PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes); + } + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } + PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime); + configuration + .setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true); + } + + private void configureInput(Job job, PTableType tableType) { + // With below query plan, we get Input split based on region boundary + String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ "; + String selectStatement = "SELECT " + hint + "1 FROM " + qTable; + PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class, + PhoenixSyncTableInputFormat.class, qTable, selectStatement); + } + + private void configureOutput(Job job) { + job.setNumReduceTasks(0); + job.setOutputFormatClass(NullOutputFormat.class); + } + + private String getJobName() { + StringBuilder jobName = new StringBuilder("PhoenixSyncTableTool"); + if (qSchemaName != null) { + jobName.append("-").append(qSchemaName); + } + jobName.append("-").append(tableName); + jobName.append("-").append(System.currentTimeMillis()); + return jobName.toString(); + } + + public CommandLine parseOptions(String[] args) throws IllegalStateException { + Options options = getOptions(); + CommandLineParser parser = DefaultParser.builder().setAllowPartialMatching(false) + .setStripLeadingAndTrailingQuotes(false).build(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + LOGGER.error("Failed to parse command line options. Args: {}. Error: {}", + Arrays.toString(args), e.getMessage(), e); + printHelpAndExit("Error parsing command line options: " + e.getMessage(), options); + } + + if (cmdLine.hasOption(HELP_OPTION.getOpt())) { + printHelpAndExit(options, 0); + } + requireOption(cmdLine, TABLE_NAME_OPTION); + requireOption(cmdLine, TARGET_CLUSTER_OPTION); + return cmdLine; + } + + private void requireOption(CommandLine cmdLine, Option option) { + if (!cmdLine.hasOption(option.getOpt())) { + throw new IllegalStateException(option.getLongOpt() + " is a mandatory parameter"); + } + } + + private Options getOptions() { + Options options = new Options(); + options.addOption(SCHEMA_NAME_OPTION); + options.addOption(TABLE_NAME_OPTION); + options.addOption(TARGET_CLUSTER_OPTION); + options.addOption(FROM_TIME_OPTION); + options.addOption(TO_TIME_OPTION); + options.addOption(DRY_RUN_OPTION); + options.addOption(CHUNK_SIZE_OPTION); + options.addOption(RUN_FOREGROUND_OPTION); + options.addOption(TENANT_ID_OPTION); + options.addOption(HELP_OPTION); + return options; + } + + private void printHelpAndExit(String errorMessage, Options options) { + System.err.println(errorMessage); + printHelpAndExit(options, -1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hadoop jar phoenix-server.jar " + PhoenixSyncTableTool.class.getName(), + "Synchronize a Phoenix table between source and target clusters", options, + "\nExample usage:\n" + + "hadoop jar phoenix-server.jar org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n" + + " --table-name MY_TABLE \\\n" + " --target-cluster <zk_quorum>:2181 \\\n" + + " --dry-run\n", + true); + System.exit(exitCode); + } + + public void populateSyncTableToolAttributes(CommandLine cmdLine) { + tableName = cmdLine.getOptionValue(TABLE_NAME_OPTION.getOpt()); + targetZkQuorum = cmdLine.getOptionValue(TARGET_CLUSTER_OPTION.getOpt()); + schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); + + if (cmdLine.hasOption(FROM_TIME_OPTION.getOpt())) { + startTime = Long.valueOf(cmdLine.getOptionValue(FROM_TIME_OPTION.getOpt())); + } else { + startTime = 0L; + } + + if (cmdLine.hasOption(TO_TIME_OPTION.getOpt())) { + endTime = Long.valueOf(cmdLine.getOptionValue(TO_TIME_OPTION.getOpt())); + } else { + // Default endTime, current time - 1 hour + endTime = EnvironmentEdgeManager.currentTimeMillis() - (60 * 60 * 1000); + } + + if (cmdLine.hasOption(CHUNK_SIZE_OPTION.getOpt())) { + chunkSizeBytes = Long.valueOf(cmdLine.getOptionValue(CHUNK_SIZE_OPTION.getOpt())); + if (chunkSizeBytes <= 0) { + throw new IllegalArgumentException( + "Chunk size must be a positive value, got: " + chunkSizeBytes); + } + } + if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) { + tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); + } + isDryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt()); + isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName); + qSchemaName = SchemaUtil.normalizeIdentifier(schemaName); + PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable); + PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable); Review Comment: I just saw your other comments on setRaw(). I agree, if we don't do raw scan, then we don't have to worry about these aspects, but if the HA guarantee includes the history (e.g., CDC), then we do we have a choice? ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow row) + throws SQLException { + + // Validate required parameters + if (row.getTableName() == null || row.getTableName().isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (row.getType() == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (row.getFromTime() == null || row.getToTime() == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } Review Comment: Edited the comment to add the missing `Preconditions`. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Status; +import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " START_ROW_KEY VARBINARY_ENCODED,\n" + + " END_ROW_KEY VARBINARY_ENCODED,\n" + " IS_DRY_RUN BOOLEAN, \n" + + " EXECUTION_START_TIME TIMESTAMP,\n" + " EXECUTION_END_TIME TIMESTAMP,\n" + + " STATUS VARCHAR(20),\n" + " COUNTERS VARCHAR(255), \n" + + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + " TARGET_CLUSTER,\n" + + " TYPE ,\n" + " FROM_TIME,\n" + " TO_TIME,\n" + + " START_ROW_KEY )" + ") TTL=" + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(String tableName, String targetCluster, Type type, + Long fromTime, Long toTime, boolean isDryRun, byte[] startKey, byte[] endKey, Status status, + Timestamp executionStartTime, Timestamp executionEndTime, String counters) throws SQLException { + + // Validate required parameters + if (tableName == null || tableName.isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (targetCluster == null || targetCluster.isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (type == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (fromTime == null || toTime == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } + + try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { + ps.setString(1, tableName); + ps.setString(2, targetCluster); + ps.setString(3, type.name()); + ps.setLong(4, fromTime); + ps.setLong(5, toTime); + ps.setBytes(6, startKey); + ps.setBytes(7, endKey); + ps.setBoolean(8, isDryRun); + ps.setTimestamp(9, executionStartTime); + ps.setTimestamp(10, executionEndTime); + ps.setString(11, status != null ? status.name() : null); + ps.setString(12, counters); + ps.executeUpdate(); + connection.commit(); + } + } + + /** + * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat to filter out + * already-processed regions. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp (nullable) + * @param toTime End timestamp (nullable) + * @return List of completed mapper regions + */ + public List<PhoenixSyncTableOutputRow> getProcessedMapperRegions(String tableName, + String targetCluster, Long fromTime, Long toTime) throws SQLException { + + String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?" + + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)"; + List<PhoenixSyncTableOutputRow> results = new ArrayList<>(); + try (PreparedStatement ps = connection.prepareStatement(query)) { + int paramIndex = 1; + ps.setString(paramIndex++, tableName); + ps.setString(paramIndex++, targetCluster); + ps.setString(paramIndex++, Type.MAPPER_REGION.name()); + ps.setLong(paramIndex++, fromTime); + ps.setLong(paramIndex++, toTime); + ps.setString(paramIndex++, Status.VERIFIED.name()); + ps.setString(paramIndex, Status.MISMATCHED.name()); + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + PhoenixSyncTableOutputRow row = + new PhoenixSyncTableOutputRow.Builder().setStartRowKey(rs.getBytes("START_ROW_KEY")) + .setEndRowKey(rs.getBytes("END_ROW_KEY")).build(); + results.add(row); + } + } + } + return results; + } + + /** + * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip already-processed chunks. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp (nullable) + * @param toTime End timestamp (nullable) + * @param mapperRegionStart Mapper region start key + * @param mapperRegionEnd Mapper region end key + * @return List of processed chunks in the region + */ + public List<PhoenixSyncTableOutputRow> getProcessedChunks(String tableName, String targetCluster, + Long fromTime, Long toTime, byte[] mapperRegionStart, byte[] mapperRegionEnd) + throws SQLException { + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? " + + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?"); + + // Check if mapper region boundaries are non-empty (i.e., NOT first/last regions) + // Only add boundary conditions for non-empty boundaries + boolean hasEndBoundary = mapperRegionEnd != null && mapperRegionEnd.length > 0; + boolean hasStartBoundary = mapperRegionStart != null && mapperRegionStart.length > 0; + + // Filter chunks that overlap with this mapper region: + // - Chunk overlaps if: chunkStart < mapperRegionEnd (when end boundary exists) + // - Chunk overlaps if: chunkEnd > mapperRegionStart (when start boundary exists) + if (hasEndBoundary) { + queryBuilder.append(" AND START_ROW_KEY <= ?"); + } + if (hasStartBoundary) { + queryBuilder.append(" AND END_ROW_KEY >= ?"); Review Comment: Perhaps we should add " AND START_ROW_KEY > <0x00>"? You may want to check the query plan for with and without this constraint to see if it is helping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
