haridsv commented on code in PR #2379: URL: https://github.com/apache/phoenix/pull/2379#discussion_r3001509296
########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow row) + throws SQLException { + + // Validate required parameters + if (row.getTableName() == null || row.getTableName().isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (row.getType() == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (row.getFromTime() == null || row.getToTime() == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } + + try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { + ps.setString(1, row.getTableName()); + ps.setString(2, row.getTargetCluster()); + ps.setString(3, row.getType().name()); Review Comment: I would recommend storing a byte code rather a long string to reduce the size of the row key. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java: ########## @@ -223,4 +233,100 @@ public static void setTenantId(final Job job, final String tenantId) { PhoenixConfigurationUtil.setTenantId(job.getConfiguration(), tenantId); } + /** + * Validates that start and end times are in the past and start < end. + * @param startTime Start timestamp in millis (nullable, defaults to 0) + * @param endTime End timestamp in millis (nullable, defaults to current time) + * @param tableName Table name for error messages + * @throws IllegalArgumentException if time range is invalid + */ + public static void validateTimeRange(Long startTime, Long endTime, String tableName) { + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); + long st = (startTime == null) ? 0L : startTime; + long et = (endTime == null) ? currentTime : endTime; + + if (et > currentTime || st >= et) { + throw new IllegalArgumentException(String.format( + "%s %s: start and end times must be in the past " + + "and start < end. Start: %d, End: %d, Current: %d", + INVALID_TIME_RANGE_EXCEPTION_MESSAGE, tableName, st, et, currentTime)); + } + } + + /** + * Validates that the end time doesn't exceed the max lookback age configured in Phoenix. + * @param configuration Hadoop configuration + * @param endTime End timestamp in millis + * @param tableName Table name for error messages + * @throws IllegalArgumentException if endTime is before min allowed timestamp + */ + public static void validateMaxLookbackAge(Configuration configuration, Long endTime, + String tableName) { + long maxLookBackAge = BaseScannerRegionObserverConstants.getMaxLookbackInMillis(configuration); + if (maxLookBackAge > 0) { + long minTimestamp = EnvironmentEdgeManager.currentTimeMillis() - maxLookBackAge; + if (endTime < minTimestamp) { + throw new IllegalArgumentException(String.format( + "Table %s can't look back past the configured max lookback age: %d ms. " + + "End time: %d, Min allowed timestamp: %d", + tableName, maxLookBackAge, endTime, minTimestamp)); + } + } + } + + /** + * Validates that a table is suitable for MR operations. Checks table existence, type, and state. + * @param connection Phoenix connection + * @param qualifiedTableName Qualified table name + * @param allowViews Whether to allow VIEW tables + * @param allowIndexes Whether to allow INDEX tables + * @return PTable instance + * @throws SQLException if connection fails + * @throws IllegalArgumentException if validation fails + */ + public static PTable validateTableForMRJob(Connection connection, String qualifiedTableName, Review Comment: How about? ```suggestion public static PTable getPTableWithValidation(Connection connection, String qualifiedTableName, ``` ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow row) + throws SQLException { + + // Validate required parameters + if (row.getTableName() == null || row.getTableName().isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (row.getType() == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (row.getFromTime() == null || row.getToTime() == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } + + try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { + ps.setString(1, row.getTableName()); + ps.setString(2, row.getTargetCluster()); + ps.setString(3, row.getType().name()); + ps.setLong(4, row.getFromTime()); + ps.setLong(5, row.getToTime()); + ps.setString(6, row.getTenantId()); + ps.setBytes(7, row.getStartRowKey()); + ps.setBytes(8, row.getEndRowKey()); + ps.setBoolean(9, row.getIsDryRun()); + ps.setTimestamp(10, row.getExecutionStartTime()); + ps.setTimestamp(11, row.getExecutionEndTime()); + ps.setString(12, row.getStatus() != null ? row.getStatus().name() : null); Review Comment: Similar to type, though not as important, it would be better to store a code rather than the name. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Properties; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixMRJobUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException; + +/** + * A MapReduce tool for verifying and detecting data inconsistencies between Phoenix tables across + * two HBase clusters (source and target). + * <h2>Use Case</h2> This tool is designed for replication/migration verification scenarios where + * data is replicated from a source Phoenix cluster to a target cluster. It efficiently detects + * which data chunks are out of sync without transferring all the data over the network. + * <h2>How It Works</h2> + * <ol> + * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the table into mapper + * regions based on HBase region boundaries.</li> + * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on both source and + * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor accumulates rows into + * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all row data (keys + + * column families + qualifiers + timestamps + values).</li> + * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives chunk metadata (start + * key, end key, row count, hash) from both clusters and compares the hashes. Matching hashes mean + * the chunk data is identical; mismatched hashes indicate inconsistency.</li> + * <li><b>Result Tracking:</b> Results are check pointed to the {@code PHOENIX_SYNC_TABLE_OUTPUT} + * table, tracking verified chunks, mismatched chunks, and processing progress for resumable + * operations.</li> + * </ol> + * <h2>Usage Example</h2> + * + * <pre> + * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name MY_TABLE \ + * --target-cluster target-zk1,target-zk2:2181:/hbase + * </pre> + */ +public class PhoenixSyncTableTool extends Configured implements Tool { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableTool.class); + + private static final Option SCHEMA_NAME_OPTION = + new Option("s", "schema", true, "Phoenix schema name (optional)"); + private static final Option TABLE_NAME_OPTION = + new Option("tn", "table-name", true, "Table name (mandatory)"); + private static final Option TARGET_CLUSTER_OPTION = + new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum (mandatory)"); + private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", true, + "Start time in milliseconds for sync (optional, defaults to 0)"); + private static final Option TO_TIME_OPTION = new Option("tt", "to-time", true, + "End time in milliseconds for sync (optional, defaults to current time - 1 hour)"); + private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", false, + "Dry run mode - only checkpoint inconsistencies, do not repair (optional)"); + private static final Option CHUNK_SIZE_OPTION = + new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, defaults to 1GB)"); + private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", "run-foreground", false, + "Run the job in foreground. Default - Runs the job in background."); + private static final Option TENANT_ID_OPTION = + new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific table sync (optional)"); + private static final Option RAW_SCAN_OPTION = new Option("rs", "raw-scan", false, + "Enable raw scan mode to include delete markers (optional, disabled by default)"); + private static final Option READ_ALL_VERSIONS_OPTION = new Option("rav", "read-all-versions", + false, + "Enable reading all cell versions (optional, disabled by default, reads only latest version)"); + private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); + + public static final String PHOENIX_SYNC_TABLE_NAME = "phoenix.sync.table.table.name"; + public static final String PHOENIX_SYNC_TABLE_TARGET_ZK_QUORUM = "phoenix.sync.table.target.zk"; + public static final String PHOENIX_SYNC_TABLE_FROM_TIME = "phoenix.sync.table.from.time"; + public static final String PHOENIX_SYNC_TABLE_TO_TIME = "phoenix.sync.table.to.time"; + public static final String PHOENIX_SYNC_TABLE_DRY_RUN = "phoenix.sync.table.dry.run"; + public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = + "phoenix.sync.table.chunk.size.bytes"; + public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 * 1024 * 1024; // 1GB + public static final String PHOENIX_SYNC_TABLE_RAW_SCAN = "phoenix.sync.table.raw.scan"; + public static final String PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS = + "phoenix.sync.table.read.all.versions"; + + private String schemaName; + private String tableName; + private String targetZkQuorum; + private Long startTime; + private Long endTime; + private boolean isDryRun; + private Long chunkSizeBytes; + private boolean isForeground; + private String tenantId; + private boolean isRawScan = false; + private boolean isReadAllVersions = false; + + private String qTable; + private String qSchemaName; + + private Configuration configuration; + private Job job; + + /** + * Creates an MR job that uses server-side chunking and checksum calculation + * @return Configured MapReduce job ready for submission + * @throws Exception if job creation fails + */ + private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) throws Exception { + configureTimeoutsAndRetries(configuration); + setPhoenixSyncTableToolConfiguration(configuration); + PhoenixMRJobUtil.updateCapacityQueueInfo(configuration); + Job job = Job.getInstance(configuration, getJobName()); + job.setMapperClass(PhoenixSyncTableMapper.class); + job.setJarByClass(PhoenixSyncTableTool.class); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + configureInput(job, tableType); + configureOutput(job); + obtainTargetClusterTokens(job); + return job; + } + + /** + * Obtains HBase delegation tokens from the target cluster and adds them to the job. This is + * required for cross-cluster kerberos authentication. + * @param job The MapReduce job to add tokens + */ + private void obtainTargetClusterTokens(Job job) throws IOException { + try { + Configuration targetConf = + HBaseConfiguration.createClusterConf(job.getConfiguration(), targetZkQuorum); + TableMapReduceUtil.initCredentialsForCluster(job, targetConf); + } catch (IOException e) { + if (UserProvider.instantiate(job.getConfiguration()).isHBaseSecurityEnabled()) { + throw e; + } + LOGGER.info("Skipping target cluster token acquisition (security not enabled): {}", + e.getMessage()); + } + } + + /** + * Configures timeouts and retry settings for the sync job + */ + private void configureTimeoutsAndRetries(Configuration configuration) { + long syncTableQueryTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT); + long syncTableRPCTimeoutMs = configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT); + long syncTableClientScannerTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT); + int syncTableRpcRetriesCounter = + configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER); + + configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + Long.toString(syncTableClientScannerTimeoutMs)); + configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Long.toString(syncTableRPCTimeoutMs)); + configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + Integer.toString(syncTableRpcRetriesCounter)); + configuration.set(MRJobConfig.TASK_TIMEOUT, Long.toString(syncTableQueryTimeoutMs)); + } + + private void setPhoenixSyncTableToolConfiguration(Configuration configuration) { + setPhoenixSyncTableName(configuration, qTable); + setPhoenixSyncTableTargetZkQuorum(configuration, targetZkQuorum); + setPhoenixSyncTableFromTime(configuration, startTime); + setPhoenixSyncTableToTime(configuration, endTime); + setPhoenixSyncTableDryRun(configuration, isDryRun); + setPhoenixSyncTableRawScan(configuration, isRawScan); + setPhoenixSyncTableReadAllVersions(configuration, isReadAllVersions); + PhoenixConfigurationUtil.setSplitByStats(configuration, false); + if (chunkSizeBytes != null) { + setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes); + } + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } + PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime); + configuration + .setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true); + } + + private void configureInput(Job job, PTableType tableType) { + // With below query plan, we get Input split based on region boundary + String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ "; Review Comment: We can't have an index on the index, so wondering what the purpose of this hint is. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java: ########## @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Properties; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixMRJobUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException; + +/** + * A MapReduce tool for verifying and detecting data inconsistencies between Phoenix tables across + * two HBase clusters (source and target). + * <h2>Use Case</h2> This tool is designed for replication/migration verification scenarios where + * data is replicated from a source Phoenix cluster to a target cluster. It efficiently detects + * which data chunks are out of sync without transferring all the data over the network. + * <h2>How It Works</h2> + * <ol> + * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the table into mapper + * regions based on HBase region boundaries.</li> + * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on both source and + * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor accumulates rows into + * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all row data (keys + + * column families + qualifiers + timestamps + values).</li> + * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives chunk metadata (start + * key, end key, row count, hash) from both clusters and compares the hashes. Matching hashes mean + * the chunk data is identical; mismatched hashes indicate inconsistency.</li> + * <li><b>Result Tracking:</b> Results are check pointed to the {@code PHOENIX_SYNC_TABLE_OUTPUT} + * table, tracking verified chunks, mismatched chunks, and processing progress for resumable + * operations.</li> + * </ol> + * <h2>Usage Example</h2> + * + * <pre> + * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name MY_TABLE \ + * --target-cluster target-zk1,target-zk2:2181:/hbase + * </pre> + */ +public class PhoenixSyncTableTool extends Configured implements Tool { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableTool.class); + + private static final Option SCHEMA_NAME_OPTION = + new Option("s", "schema", true, "Phoenix schema name (optional)"); + private static final Option TABLE_NAME_OPTION = + new Option("tn", "table-name", true, "Table name (mandatory)"); + private static final Option TARGET_CLUSTER_OPTION = + new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum (mandatory)"); + private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", true, + "Start time in milliseconds for sync (optional, defaults to 0)"); + private static final Option TO_TIME_OPTION = new Option("tt", "to-time", true, + "End time in milliseconds for sync (optional, defaults to current time - 1 hour)"); + private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", false, + "Dry run mode - only checkpoint inconsistencies, do not repair (optional)"); + private static final Option CHUNK_SIZE_OPTION = + new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, defaults to 1GB)"); + private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", "run-foreground", false, + "Run the job in foreground. Default - Runs the job in background."); + private static final Option TENANT_ID_OPTION = + new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific table sync (optional)"); + private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); + + private String schemaName; + private String tableName; + private String targetZkQuorum; + private Long startTime; + private Long endTime; + private boolean isDryRun; + private Long chunkSizeBytes; + private boolean isForeground; + private String tenantId; + + private String qTable; + private String qSchemaName; + + private Configuration configuration; + private Job job; + private PTable pTable; + + /** + * Creates an MR job that uses server-side chunking and checksum calculation + * @return Configured MapReduce job ready for submission + * @throws Exception if job creation fails + */ + private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) throws Exception { + configureTimeoutsAndRetries(configuration); + setPhoenixSyncTableToolConfiguration(configuration); + PhoenixMRJobUtil.updateCapacityQueueInfo(configuration); + Job job = Job.getInstance(configuration, getJobName()); + job.setMapperClass(PhoenixSyncTableMapper.class); + job.setJarByClass(PhoenixSyncTableTool.class); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + configureInput(job, tableType); + configureOutput(job); + obtainTargetClusterTokens(job); + return job; + } + + /** + * Obtains HBase delegation tokens from the target cluster and adds them to the job. This is + * required for cross-cluster kerberos authentication. + * @param job The MapReduce job to add tokens + */ + private void obtainTargetClusterTokens(Job job) throws IOException { + Configuration targetConf = + PhoenixMapReduceUtil.createConfigurationForZkQuorum(job.getConfiguration(), targetZkQuorum); + TableMapReduceUtil.initCredentialsForCluster(job, targetConf); + } + + /** + * Configures timeouts and retry settings for the sync job + */ + private void configureTimeoutsAndRetries(Configuration configuration) { + long syncTableQueryTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT); + long syncTableRPCTimeoutMs = configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT); + long syncTableClientScannerTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT); + int syncTableRpcRetriesCounter = + configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER); + + configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + Long.toString(syncTableClientScannerTimeoutMs)); + configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Long.toString(syncTableRPCTimeoutMs)); + configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + Integer.toString(syncTableRpcRetriesCounter)); + configuration.set(MRJobConfig.TASK_TIMEOUT, Long.toString(syncTableQueryTimeoutMs)); + } + + private void setPhoenixSyncTableToolConfiguration(Configuration configuration) { + PhoenixConfigurationUtil.setPhoenixSyncTableName(configuration, qTable); + PhoenixConfigurationUtil.setPhoenixSyncTableTargetZkQuorum(configuration, targetZkQuorum); + PhoenixConfigurationUtil.setPhoenixSyncTableFromTime(configuration, startTime); + PhoenixConfigurationUtil.setPhoenixSyncTableToTime(configuration, endTime); + PhoenixConfigurationUtil.setPhoenixSyncTableDryRun(configuration, isDryRun); + PhoenixConfigurationUtil.setSplitByStats(configuration, false); + if (chunkSizeBytes != null) { + PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes); + } + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } + PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime); + configuration + .setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true); + } + + private void configureInput(Job job, PTableType tableType) { + // With below query plan, we get Input split based on region boundary + String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ "; + String selectStatement = "SELECT " + hint + "1 FROM " + qTable; + PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class, + PhoenixSyncTableInputFormat.class, qTable, selectStatement); + } + + private void configureOutput(Job job) { + job.setNumReduceTasks(0); + job.setOutputFormatClass(NullOutputFormat.class); + } + + private String getJobName() { + StringBuilder jobName = new StringBuilder("PhoenixSyncTableTool"); + if (qSchemaName != null) { + jobName.append("-").append(qSchemaName); + } + jobName.append("-").append(tableName); + jobName.append("-").append(System.currentTimeMillis()); + return jobName.toString(); + } + + public CommandLine parseOptions(String[] args) throws IllegalStateException { + Options options = getOptions(); + CommandLineParser parser = DefaultParser.builder().setAllowPartialMatching(false) + .setStripLeadingAndTrailingQuotes(false).build(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + LOGGER.error("Failed to parse command line options. Args: {}. Error: {}", + Arrays.toString(args), e.getMessage(), e); + printHelpAndExit("Error parsing command line options: " + e.getMessage(), options); + } + + if (cmdLine.hasOption(HELP_OPTION.getOpt())) { + printHelpAndExit(options, 0); + } + requireOption(cmdLine, TABLE_NAME_OPTION); + requireOption(cmdLine, TARGET_CLUSTER_OPTION); + return cmdLine; + } + + private void requireOption(CommandLine cmdLine, Option option) { + if (!cmdLine.hasOption(option.getOpt())) { + throw new IllegalStateException(option.getLongOpt() + " is a mandatory parameter"); + } + } + + private Options getOptions() { + Options options = new Options(); + options.addOption(SCHEMA_NAME_OPTION); + options.addOption(TABLE_NAME_OPTION); + options.addOption(TARGET_CLUSTER_OPTION); + options.addOption(FROM_TIME_OPTION); + options.addOption(TO_TIME_OPTION); + options.addOption(DRY_RUN_OPTION); + options.addOption(CHUNK_SIZE_OPTION); + options.addOption(RUN_FOREGROUND_OPTION); + options.addOption(TENANT_ID_OPTION); + options.addOption(HELP_OPTION); + return options; + } + + private void printHelpAndExit(String errorMessage, Options options) { + System.err.println(errorMessage); + printHelpAndExit(options, -1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hadoop jar phoenix-server.jar " + PhoenixSyncTableTool.class.getName(), + "Synchronize a Phoenix table between source and target clusters", options, + "\nExample usage:\n" + + "hadoop jar phoenix-server.jar org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n" + + " --table-name MY_TABLE \\\n" + " --target-cluster <zk_quorum>:2181 \\\n" + + " --dry-run\n", + true); + System.exit(exitCode); + } + + public void populateSyncTableToolAttributes(CommandLine cmdLine) { + tableName = cmdLine.getOptionValue(TABLE_NAME_OPTION.getOpt()); + targetZkQuorum = cmdLine.getOptionValue(TARGET_CLUSTER_OPTION.getOpt()); + schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); + + if (cmdLine.hasOption(FROM_TIME_OPTION.getOpt())) { + startTime = Long.valueOf(cmdLine.getOptionValue(FROM_TIME_OPTION.getOpt())); + } else { + startTime = 0L; + } + + if (cmdLine.hasOption(TO_TIME_OPTION.getOpt())) { + endTime = Long.valueOf(cmdLine.getOptionValue(TO_TIME_OPTION.getOpt())); + } else { + // Default endTime, current time - 1 hour + endTime = EnvironmentEdgeManager.currentTimeMillis() - (60 * 60 * 1000); + } + + if (cmdLine.hasOption(CHUNK_SIZE_OPTION.getOpt())) { + chunkSizeBytes = Long.valueOf(cmdLine.getOptionValue(CHUNK_SIZE_OPTION.getOpt())); + if (chunkSizeBytes <= 0) { + throw new IllegalArgumentException( + "Chunk size must be a positive value, got: " + chunkSizeBytes); + } + } + if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) { + tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); + } + isDryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt()); + isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName); + qSchemaName = SchemaUtil.normalizeIdentifier(schemaName); + PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable); + PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable); Review Comment: Don't we need to validate that the startTime is within the max lookback window? If startTime is beyond the window, background compactions on the source and target clusters may have purged different sets of historical versions or delete markers. Since these compactions don't run in sync, the tool will see different data states on each cluster, leading to false-positive mismatches for data that is actually consistent but has simply been cleaned up at different times. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", Review Comment: No verification is being done, perhaps you want to say something like this? ```suggestion LOGGER.info("Initialization of checkpoint table {} complete", ``` ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow row) + throws SQLException { + + // Validate required parameters + if (row.getTableName() == null || row.getTableName().isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (row.getType() == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (row.getFromTime() == null || row.getToTime() == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } Review Comment: I see you are already using in a few other places, why not use here and a few other places and make it consistent and less verbose? ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; Review Comment: Why is `UPSERT_CHECKPOINT_SQL` but this one is not? It seems inconsistent. ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java: ########## @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Properties; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixMRJobUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException; + +/** + * A MapReduce tool for verifying and detecting data inconsistencies between Phoenix tables across + * two HBase clusters (source and target). + * <h2>Use Case</h2> This tool is designed for replication/migration verification scenarios where + * data is replicated from a source Phoenix cluster to a target cluster. It efficiently detects + * which data chunks are out of sync without transferring all the data over the network. + * <h2>How It Works</h2> + * <ol> + * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the table into mapper + * regions based on HBase region boundaries.</li> + * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on both source and + * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor accumulates rows into + * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all row data (keys + + * column families + qualifiers + timestamps + values).</li> + * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives chunk metadata (start + * key, end key, row count, hash) from both clusters and compares the hashes. Matching hashes mean + * the chunk data is identical; mismatched hashes indicate inconsistency.</li> + * <li><b>Result Tracking:</b> Results are check pointed to the {@code PHOENIX_SYNC_TABLE_OUTPUT} + * table, tracking verified chunks, mismatched chunks, and processing progress for resumable + * operations.</li> + * </ol> + * <h2>Usage Example</h2> + * + * <pre> + * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name MY_TABLE \ + * --target-cluster target-zk1,target-zk2:2181:/hbase + * </pre> + */ +public class PhoenixSyncTableTool extends Configured implements Tool { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableTool.class); + + private static final Option SCHEMA_NAME_OPTION = + new Option("s", "schema", true, "Phoenix schema name (optional)"); + private static final Option TABLE_NAME_OPTION = + new Option("tn", "table-name", true, "Table name (mandatory)"); + private static final Option TARGET_CLUSTER_OPTION = + new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum (mandatory)"); + private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", true, + "Start time in milliseconds for sync (optional, defaults to 0)"); + private static final Option TO_TIME_OPTION = new Option("tt", "to-time", true, + "End time in milliseconds for sync (optional, defaults to current time - 1 hour)"); + private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", false, + "Dry run mode - only checkpoint inconsistencies, do not repair (optional)"); + private static final Option CHUNK_SIZE_OPTION = + new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, defaults to 1GB)"); + private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", "run-foreground", false, + "Run the job in foreground. Default - Runs the job in background."); + private static final Option TENANT_ID_OPTION = + new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific table sync (optional)"); + private static final Option RAW_SCAN_OPTION = new Option("rs", "raw-scan", false, + "Enable raw scan mode to include delete markers (optional, disabled by default)"); + private static final Option READ_ALL_VERSIONS_OPTION = new Option("rav", "read-all-versions", + false, + "Enable reading all cell versions (optional, disabled by default, reads only latest version)"); + private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); + + public static final String PHOENIX_SYNC_TABLE_NAME = "phoenix.sync.table.table.name"; + public static final String PHOENIX_SYNC_TABLE_TARGET_ZK_QUORUM = "phoenix.sync.table.target.zk"; + public static final String PHOENIX_SYNC_TABLE_FROM_TIME = "phoenix.sync.table.from.time"; + public static final String PHOENIX_SYNC_TABLE_TO_TIME = "phoenix.sync.table.to.time"; + public static final String PHOENIX_SYNC_TABLE_DRY_RUN = "phoenix.sync.table.dry.run"; + public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = + "phoenix.sync.table.chunk.size.bytes"; + public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 * 1024 * 1024; // 1GB + public static final String PHOENIX_SYNC_TABLE_RAW_SCAN = "phoenix.sync.table.raw.scan"; + public static final String PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS = + "phoenix.sync.table.read.all.versions"; + + private String schemaName; + private String tableName; + private String targetZkQuorum; + private Long startTime; + private Long endTime; + private boolean isDryRun; + private Long chunkSizeBytes; + private boolean isForeground; + private String tenantId; + private boolean isRawScan = false; + private boolean isReadAllVersions = false; + + private String qTable; + private String qSchemaName; + + private Configuration configuration; + private Job job; + + /** + * Creates an MR job that uses server-side chunking and checksum calculation + * @return Configured MapReduce job ready for submission + * @throws Exception if job creation fails + */ + private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) throws Exception { + configureTimeoutsAndRetries(configuration); + setPhoenixSyncTableToolConfiguration(configuration); + PhoenixMRJobUtil.updateCapacityQueueInfo(configuration); + Job job = Job.getInstance(configuration, getJobName()); + job.setMapperClass(PhoenixSyncTableMapper.class); + job.setJarByClass(PhoenixSyncTableTool.class); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + configureInput(job, tableType); + configureOutput(job); + obtainTargetClusterTokens(job); + return job; + } + + /** + * Obtains HBase delegation tokens from the target cluster and adds them to the job. This is + * required for cross-cluster kerberos authentication. + * @param job The MapReduce job to add tokens + */ + private void obtainTargetClusterTokens(Job job) throws IOException { + try { + Configuration targetConf = + HBaseConfiguration.createClusterConf(job.getConfiguration(), targetZkQuorum); + TableMapReduceUtil.initCredentialsForCluster(job, targetConf); + } catch (IOException e) { + if (UserProvider.instantiate(job.getConfiguration()).isHBaseSecurityEnabled()) { + throw e; + } + LOGGER.info("Skipping target cluster token acquisition (security not enabled): {}", + e.getMessage()); + } + } + + /** + * Configures timeouts and retry settings for the sync job + */ + private void configureTimeoutsAndRetries(Configuration configuration) { + long syncTableQueryTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT); + long syncTableRPCTimeoutMs = configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT); + long syncTableClientScannerTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT); + int syncTableRpcRetriesCounter = + configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER); + + configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + Long.toString(syncTableClientScannerTimeoutMs)); + configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Long.toString(syncTableRPCTimeoutMs)); + configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + Integer.toString(syncTableRpcRetriesCounter)); + configuration.set(MRJobConfig.TASK_TIMEOUT, Long.toString(syncTableQueryTimeoutMs)); + } + + private void setPhoenixSyncTableToolConfiguration(Configuration configuration) { + setPhoenixSyncTableName(configuration, qTable); + setPhoenixSyncTableTargetZkQuorum(configuration, targetZkQuorum); + setPhoenixSyncTableFromTime(configuration, startTime); + setPhoenixSyncTableToTime(configuration, endTime); + setPhoenixSyncTableDryRun(configuration, isDryRun); + setPhoenixSyncTableRawScan(configuration, isRawScan); + setPhoenixSyncTableReadAllVersions(configuration, isReadAllVersions); + PhoenixConfigurationUtil.setSplitByStats(configuration, false); + if (chunkSizeBytes != null) { + setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes); + } + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } + PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime); + configuration + .setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true); + } + + private void configureInput(Job job, PTableType tableType) { + // With below query plan, we get Input split based on region boundary + String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ "; + String selectStatement = "SELECT " + hint + "1 FROM " + qTable; + PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class, + PhoenixSyncTableInputFormat.class, qTable, selectStatement); Review Comment: I noticed getSplits() calls getSelectStatement(), which seems to override the SELECT_STATEMENT config, am I reading it wrong? ########## phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java: ########## @@ -337,4 +337,5 @@ public void testIndexToolSourceConfig() { sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf); Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE); } + Review Comment: Inadvertent change? ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableCheckpointOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " TENANT_ID, START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " TENANT_ID VARCHAR,\n" + + " START_ROW_KEY VARBINARY_ENCODED,\n" + " END_ROW_KEY VARBINARY_ENCODED,\n" + + " IS_DRY_RUN BOOLEAN, \n" + " EXECUTION_START_TIME TIMESTAMP,\n" + + " EXECUTION_END_TIME TIMESTAMP,\n" + " STATUS VARCHAR(20),\n" + + " COUNTERS VARCHAR, \n" + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + + " TARGET_CLUSTER,\n" + " TYPE ,\n" + " FROM_TIME,\n" + + " TO_TIME,\n" + " TENANT_ID,\n" + " START_ROW_KEY )" + ") TTL=" + + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(PhoenixSyncTableCheckpointOutputRow row) + throws SQLException { + + // Validate required parameters + if (row.getTableName() == null || row.getTableName().isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (row.getTargetCluster() == null || row.getTargetCluster().isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (row.getType() == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (row.getFromTime() == null || row.getToTime() == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } + + try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { + ps.setString(1, row.getTableName()); + ps.setString(2, row.getTargetCluster()); + ps.setString(3, row.getType().name()); + ps.setLong(4, row.getFromTime()); + ps.setLong(5, row.getToTime()); + ps.setString(6, row.getTenantId()); + ps.setBytes(7, row.getStartRowKey()); + ps.setBytes(8, row.getEndRowKey()); + ps.setBoolean(9, row.getIsDryRun()); + ps.setTimestamp(10, row.getExecutionStartTime()); + ps.setTimestamp(11, row.getExecutionEndTime()); + ps.setString(12, row.getStatus() != null ? row.getStatus().name() : null); + ps.setString(13, row.getCounters()); + ps.executeUpdate(); + connection.commit(); + } + } + + /** + * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat to filter out + * already-processed regions. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp + * @param toTime End timestamp + * @param tenantId Tenant ID + * @return List of completed mapper regions + */ + public List<PhoenixSyncTableCheckpointOutputRow> getProcessedMapperRegions(String tableName, + String targetCluster, Long fromTime, Long toTime, String tenantId) throws SQLException { + + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM ") + .append(SYNC_TABLE_CHECKPOINT_TABLE_NAME) + .append(" WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?") + .append(" AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? "); + + // Conditionally build TENANT_ID clause based on whether tenantId is null + if (tenantId == null) { + queryBuilder.append(" AND TENANT_ID IS NULL"); + } else { + queryBuilder.append(" AND TENANT_ID = ?"); + } + + queryBuilder.append( + " ORDER BY TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME, TENANT_ID, START_ROW_KEY"); + + List<PhoenixSyncTableCheckpointOutputRow> results = new ArrayList<>(); + try (PreparedStatement ps = connection.prepareStatement(queryBuilder.toString())) { + int paramIndex = 1; + ps.setString(paramIndex++, tableName); + ps.setString(paramIndex++, targetCluster); + ps.setString(paramIndex++, Type.REGION.name()); + ps.setLong(paramIndex++, fromTime); + ps.setLong(paramIndex++, toTime); + // Only bind tenantId parameter if non-null + if (tenantId != null) { + ps.setString(paramIndex, tenantId); + } + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + PhoenixSyncTableCheckpointOutputRow row = + new PhoenixSyncTableCheckpointOutputRow.Builder() + .setStartRowKey(rs.getBytes("START_ROW_KEY")).setEndRowKey(rs.getBytes("END_ROW_KEY")) + .build(); + results.add(row); + } + } + } + return results; + } + + /** + * Queries for processed chunks. Used by PhoenixSyncTableMapper to skip already-processed chunks. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp + * @param toTime End timestamp + * @param tenantId Tenant ID + * @param mapperRegionStart Mapper region start key + * @param mapperRegionEnd Mapper region end key + * @return List of processed chunks in the region + */ + public List<PhoenixSyncTableCheckpointOutputRow> getProcessedChunks(String tableName, + String targetCluster, Long fromTime, Long toTime, String tenantId, byte[] mapperRegionStart, + byte[] mapperRegionEnd) throws SQLException { + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ? " + + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ?"); + + // Conditionally build TENANT_ID clause based on whether tenantId is null + if (tenantId == null) { + queryBuilder.append(" AND TENANT_ID IS NULL"); + } else { + queryBuilder.append(" AND TENANT_ID = ?"); + } Review Comment: I don't see TENANT_ID column on the table, what am I missing? ########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java: ########## @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Properties; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.coprocessor.PhoenixSyncTableRegionScanner; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixMRJobUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options; +import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException; + +/** + * A MapReduce tool for verifying and detecting data inconsistencies between Phoenix tables across + * two HBase clusters (source and target). + * <h2>Use Case</h2> This tool is designed for replication/migration verification scenarios where + * data is replicated from a source Phoenix cluster to a target cluster. It efficiently detects + * which data chunks are out of sync without transferring all the data over the network. + * <h2>How It Works</h2> + * <ol> + * <li><b>Job Setup:</b> The tool creates a MapReduce job that partitions the table into mapper + * regions based on HBase region boundaries.</li> + * <li><b>Server-Side Chunking:</b> Each mapper triggers a coprocessor scan on both source and + * target clusters. The {@link PhoenixSyncTableRegionScanner} coprocessor accumulates rows into + * chunks (configurable size, default 1GB) and computes an SHA-256 hash of all row data (keys + + * column families + qualifiers + timestamps + values).</li> + * <li><b>Hash Comparison:</b> The {@link PhoenixSyncTableMapper} receives chunk metadata (start + * key, end key, row count, hash) from both clusters and compares the hashes. Matching hashes mean + * the chunk data is identical; mismatched hashes indicate inconsistency.</li> + * <li><b>Result Tracking:</b> Results are check pointed to the {@code PHOENIX_SYNC_TABLE_OUTPUT} + * table, tracking verified chunks, mismatched chunks, and processing progress for resumable + * operations.</li> + * </ol> + * <h2>Usage Example</h2> + * + * <pre> + * hbase org.apache.phoenix.mapreduce.PhoenixSyncTableTool \ --table-name MY_TABLE \ + * --target-cluster target-zk1,target-zk2:2181:/hbase + * </pre> + */ +public class PhoenixSyncTableTool extends Configured implements Tool { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableTool.class); + + private static final Option SCHEMA_NAME_OPTION = + new Option("s", "schema", true, "Phoenix schema name (optional)"); + private static final Option TABLE_NAME_OPTION = + new Option("tn", "table-name", true, "Table name (mandatory)"); + private static final Option TARGET_CLUSTER_OPTION = + new Option("tc", "target-cluster", true, "Target cluster ZooKeeper quorum (mandatory)"); + private static final Option FROM_TIME_OPTION = new Option("ft", "from-time", true, + "Start time in milliseconds for sync (optional, defaults to 0)"); + private static final Option TO_TIME_OPTION = new Option("tt", "to-time", true, + "End time in milliseconds for sync (optional, defaults to current time - 1 hour)"); + private static final Option DRY_RUN_OPTION = new Option("dr", "dry-run", false, + "Dry run mode - only checkpoint inconsistencies, do not repair (optional)"); + private static final Option CHUNK_SIZE_OPTION = + new Option("cs", "chunk-size", true, "Chunk size in bytes (optional, defaults to 1GB)"); + private static final Option RUN_FOREGROUND_OPTION = new Option("runfg", "run-foreground", false, + "Run the job in foreground. Default - Runs the job in background."); + private static final Option TENANT_ID_OPTION = + new Option("tenant", "tenant-id", true, "Tenant ID for tenant-specific table sync (optional)"); + private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); + + private String schemaName; + private String tableName; + private String targetZkQuorum; + private Long startTime; + private Long endTime; + private boolean isDryRun; + private Long chunkSizeBytes; + private boolean isForeground; + private String tenantId; + + private String qTable; + private String qSchemaName; + + private Configuration configuration; + private Job job; + private PTable pTable; + + /** + * Creates an MR job that uses server-side chunking and checksum calculation + * @return Configured MapReduce job ready for submission + * @throws Exception if job creation fails + */ + private Job configureAndCreatePhoenixSyncTableJob(PTableType tableType) throws Exception { + configureTimeoutsAndRetries(configuration); + setPhoenixSyncTableToolConfiguration(configuration); + PhoenixMRJobUtil.updateCapacityQueueInfo(configuration); + Job job = Job.getInstance(configuration, getJobName()); + job.setMapperClass(PhoenixSyncTableMapper.class); + job.setJarByClass(PhoenixSyncTableTool.class); + TableMapReduceUtil.initCredentials(job); + TableMapReduceUtil.addDependencyJars(job); + configureInput(job, tableType); + configureOutput(job); + obtainTargetClusterTokens(job); + return job; + } + + /** + * Obtains HBase delegation tokens from the target cluster and adds them to the job. This is + * required for cross-cluster kerberos authentication. + * @param job The MapReduce job to add tokens + */ + private void obtainTargetClusterTokens(Job job) throws IOException { + Configuration targetConf = + PhoenixMapReduceUtil.createConfigurationForZkQuorum(job.getConfiguration(), targetZkQuorum); + TableMapReduceUtil.initCredentialsForCluster(job, targetConf); + } + + /** + * Configures timeouts and retry settings for the sync job + */ + private void configureTimeoutsAndRetries(Configuration configuration) { + long syncTableQueryTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_QUERY_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_QUERY_TIMEOUT); + long syncTableRPCTimeoutMs = configuration.getLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT); + long syncTableClientScannerTimeoutMs = + configuration.getLong(QueryServices.SYNC_TABLE_CLIENT_SCANNER_TIMEOUT_ATTRIB, + QueryServicesOptions.DEFAULT_SYNC_TABLE_CLIENT_SCANNER_TIMEOUT); + int syncTableRpcRetriesCounter = + configuration.getInt(QueryServices.SYNC_TABLE_RPC_RETRIES_COUNTER, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_RETRIES_COUNTER); + + configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + Long.toString(syncTableClientScannerTimeoutMs)); + configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY, Long.toString(syncTableRPCTimeoutMs)); + configuration.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + Integer.toString(syncTableRpcRetriesCounter)); + configuration.set(MRJobConfig.TASK_TIMEOUT, Long.toString(syncTableQueryTimeoutMs)); + } + + private void setPhoenixSyncTableToolConfiguration(Configuration configuration) { + PhoenixConfigurationUtil.setPhoenixSyncTableName(configuration, qTable); + PhoenixConfigurationUtil.setPhoenixSyncTableTargetZkQuorum(configuration, targetZkQuorum); + PhoenixConfigurationUtil.setPhoenixSyncTableFromTime(configuration, startTime); + PhoenixConfigurationUtil.setPhoenixSyncTableToTime(configuration, endTime); + PhoenixConfigurationUtil.setPhoenixSyncTableDryRun(configuration, isDryRun); + PhoenixConfigurationUtil.setSplitByStats(configuration, false); + if (chunkSizeBytes != null) { + PhoenixConfigurationUtil.setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes); + } + if (tenantId != null) { + PhoenixConfigurationUtil.setTenantId(configuration, tenantId); + } + PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime); + configuration + .setBooleanIfUnset(PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true); + } + + private void configureInput(Job job, PTableType tableType) { + // With below query plan, we get Input split based on region boundary + String hint = (tableType == PTableType.INDEX) ? "" : "/*+ NO_INDEX */ "; + String selectStatement = "SELECT " + hint + "1 FROM " + qTable; + PhoenixMapReduceUtil.setInput(job, DBInputFormat.NullDBWritable.class, + PhoenixSyncTableInputFormat.class, qTable, selectStatement); + } + + private void configureOutput(Job job) { + job.setNumReduceTasks(0); + job.setOutputFormatClass(NullOutputFormat.class); + } + + private String getJobName() { + StringBuilder jobName = new StringBuilder("PhoenixSyncTableTool"); + if (qSchemaName != null) { + jobName.append("-").append(qSchemaName); + } + jobName.append("-").append(tableName); + jobName.append("-").append(System.currentTimeMillis()); + return jobName.toString(); + } + + public CommandLine parseOptions(String[] args) throws IllegalStateException { + Options options = getOptions(); + CommandLineParser parser = DefaultParser.builder().setAllowPartialMatching(false) + .setStripLeadingAndTrailingQuotes(false).build(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + LOGGER.error("Failed to parse command line options. Args: {}. Error: {}", + Arrays.toString(args), e.getMessage(), e); + printHelpAndExit("Error parsing command line options: " + e.getMessage(), options); + } + + if (cmdLine.hasOption(HELP_OPTION.getOpt())) { + printHelpAndExit(options, 0); + } + requireOption(cmdLine, TABLE_NAME_OPTION); + requireOption(cmdLine, TARGET_CLUSTER_OPTION); + return cmdLine; + } + + private void requireOption(CommandLine cmdLine, Option option) { + if (!cmdLine.hasOption(option.getOpt())) { + throw new IllegalStateException(option.getLongOpt() + " is a mandatory parameter"); + } + } + + private Options getOptions() { + Options options = new Options(); + options.addOption(SCHEMA_NAME_OPTION); + options.addOption(TABLE_NAME_OPTION); + options.addOption(TARGET_CLUSTER_OPTION); + options.addOption(FROM_TIME_OPTION); + options.addOption(TO_TIME_OPTION); + options.addOption(DRY_RUN_OPTION); + options.addOption(CHUNK_SIZE_OPTION); + options.addOption(RUN_FOREGROUND_OPTION); + options.addOption(TENANT_ID_OPTION); + options.addOption(HELP_OPTION); + return options; + } + + private void printHelpAndExit(String errorMessage, Options options) { + System.err.println(errorMessage); + printHelpAndExit(options, -1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("hadoop jar phoenix-server.jar " + PhoenixSyncTableTool.class.getName(), + "Synchronize a Phoenix table between source and target clusters", options, + "\nExample usage:\n" + + "hadoop jar phoenix-server.jar org.apache.phoenix.mapreduce.PhoenixSyncTableTool \\\n" + + " --table-name MY_TABLE \\\n" + " --target-cluster <zk_quorum>:2181 \\\n" + + " --dry-run\n", + true); + System.exit(exitCode); + } + + public void populateSyncTableToolAttributes(CommandLine cmdLine) { + tableName = cmdLine.getOptionValue(TABLE_NAME_OPTION.getOpt()); + targetZkQuorum = cmdLine.getOptionValue(TARGET_CLUSTER_OPTION.getOpt()); + schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); + + if (cmdLine.hasOption(FROM_TIME_OPTION.getOpt())) { + startTime = Long.valueOf(cmdLine.getOptionValue(FROM_TIME_OPTION.getOpt())); + } else { + startTime = 0L; + } + + if (cmdLine.hasOption(TO_TIME_OPTION.getOpt())) { + endTime = Long.valueOf(cmdLine.getOptionValue(TO_TIME_OPTION.getOpt())); + } else { + // Default endTime, current time - 1 hour + endTime = EnvironmentEdgeManager.currentTimeMillis() - (60 * 60 * 1000); + } + + if (cmdLine.hasOption(CHUNK_SIZE_OPTION.getOpt())) { + chunkSizeBytes = Long.valueOf(cmdLine.getOptionValue(CHUNK_SIZE_OPTION.getOpt())); + if (chunkSizeBytes <= 0) { + throw new IllegalArgumentException( + "Chunk size must be a positive value, got: " + chunkSizeBytes); + } + } + if (cmdLine.hasOption(TENANT_ID_OPTION.getOpt())) { + tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt()); + } + isDryRun = cmdLine.hasOption(DRY_RUN_OPTION.getOpt()); + isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName); + qSchemaName = SchemaUtil.normalizeIdentifier(schemaName); + PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable); + PhoenixMapReduceUtil.validateMaxLookbackAge(configuration, endTime, qTable); Review Comment: On the other hand, we should not only enforce that it is not outside the window, we should also enforce a "safety buffer" to accommodate the data in flight. Even when the endTime is with in the window, if it is too close to the current time, it may miss the data that is still in flight and may cause false positives. In practice, this may not matter as the time it takes to setup and run could be in the order of several minutes and so enough for the catch up to complete, but I think it is better to make it explicit by enforcing a safety buffer and make this more deterministic. If we remove this check and allow the endTime to be in the future, the possibility of having false positives due to the data in flight becomes a lot more pronounced. By enforcing both startTime and endTIme, we can ensure a "consistent window" where data is guaranteed to be fully replicated and 'quiesced' on both sides. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
