tkhurana commented on code in PR #2379: URL: https://github.com/apache/phoenix/pull/2379#discussion_r2968555490
########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableOutputRepository.java: ########## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Status; +import org.apache.phoenix.mapreduce.PhoenixSyncTableOutputRow.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Repository for managing the PHOENIX_SYNC_TABLE_CHECKPOINT table. This table stores checkpoint + * information for the PhoenixSyncTableTool, enabling: 1. Mapper Level checkpointing (skip completed + * mapper regions on restart) 2. Chunk level checkpointing (skip completed chunks) + */ +public class PhoenixSyncTableOutputRepository { + + private static final Logger LOGGER = + LoggerFactory.getLogger(PhoenixSyncTableOutputRepository.class); + public static final String SYNC_TABLE_CHECKPOINT_TABLE_NAME = "PHOENIX_SYNC_TABLE_CHECKPOINT"; + private static final int OUTPUT_TABLE_TTL_SECONDS = 90 * 24 * 60 * 60; // 90 days + private final Connection connection; + private static final String UPSERT_CHECKPOINT_SQL = "UPSERT INTO " + + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (TABLE_NAME, TARGET_CLUSTER, TYPE, FROM_TIME, TO_TIME," + + " START_ROW_KEY, END_ROW_KEY, IS_DRY_RUN, EXECUTION_START_TIME, EXECUTION_END_TIME," + + " STATUS, COUNTERS) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + + /** + * Creates a repository for managing sync table checkpoint operations. Note: The connection is + * stored as-is and shared across operations. The caller retains ownership and is responsible for + * connection lifecycle. + * @param connection Phoenix connection (must remain open for repository lifetime) + */ + public PhoenixSyncTableOutputRepository(Connection connection) { + this.connection = connection; + } + + public void createSyncCheckpointTableIfNotExists() throws SQLException { + String ddl = "CREATE TABLE IF NOT EXISTS " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + " (\n" + + " TABLE_NAME VARCHAR NOT NULL,\n" + " TARGET_CLUSTER VARCHAR NOT NULL,\n" + + " TYPE VARCHAR(20) NOT NULL,\n" + " FROM_TIME BIGINT NOT NULL,\n" + + " TO_TIME BIGINT NOT NULL,\n" + " START_ROW_KEY VARBINARY_ENCODED,\n" + + " END_ROW_KEY VARBINARY_ENCODED,\n" + " IS_DRY_RUN BOOLEAN, \n" + + " EXECUTION_START_TIME TIMESTAMP,\n" + " EXECUTION_END_TIME TIMESTAMP,\n" + + " STATUS VARCHAR(20),\n" + " COUNTERS VARCHAR(255), \n" + + " CONSTRAINT PK PRIMARY KEY (\n" + " TABLE_NAME,\n" + " TARGET_CLUSTER,\n" + + " TYPE ,\n" + " FROM_TIME,\n" + " TO_TIME,\n" + + " START_ROW_KEY )" + ") TTL=" + OUTPUT_TABLE_TTL_SECONDS; + + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + connection.commit(); + LOGGER.info("Successfully created or verified existence of {} table", + SYNC_TABLE_CHECKPOINT_TABLE_NAME); + } + } + + public void checkpointSyncTableResult(String tableName, String targetCluster, Type type, + Long fromTime, Long toTime, boolean isDryRun, byte[] startKey, byte[] endKey, Status status, + Timestamp executionStartTime, Timestamp executionEndTime, String counters) throws SQLException { + + // Validate required parameters + if (tableName == null || tableName.isEmpty()) { + throw new IllegalArgumentException("TableName cannot be null or empty for checkpoint"); + } + if (targetCluster == null || targetCluster.isEmpty()) { + throw new IllegalArgumentException("TargetCluster cannot be null or empty for checkpoint"); + } + if (type == null) { + throw new IllegalArgumentException("Type cannot be null for checkpoint"); + } + if (fromTime == null || toTime == null) { + throw new IllegalArgumentException("FromTime and ToTime cannot be null for checkpoint"); + } + + try (PreparedStatement ps = connection.prepareStatement(UPSERT_CHECKPOINT_SQL)) { + ps.setString(1, tableName); + ps.setString(2, targetCluster); + ps.setString(3, type.name()); + ps.setLong(4, fromTime); + ps.setLong(5, toTime); + ps.setBytes(6, startKey); + ps.setBytes(7, endKey); + ps.setBoolean(8, isDryRun); + ps.setTimestamp(9, executionStartTime); + ps.setTimestamp(10, executionEndTime); + ps.setString(11, status != null ? status.name() : null); + ps.setString(12, counters); + ps.executeUpdate(); + connection.commit(); + } + } + + /** + * Queries for completed mapper regions. Used by PhoenixSyncTableInputFormat to filter out + * already-processed regions. + * @param tableName Source table name + * @param targetCluster Target cluster ZK quorum + * @param fromTime Start timestamp (nullable) + * @param toTime End timestamp (nullable) + * @return List of completed mapper regions + */ + public List<PhoenixSyncTableOutputRow> getProcessedMapperRegions(String tableName, + String targetCluster, Long fromTime, Long toTime) throws SQLException { + + String query = "SELECT START_ROW_KEY, END_ROW_KEY FROM " + SYNC_TABLE_CHECKPOINT_TABLE_NAME + + " WHERE TABLE_NAME = ? AND TARGET_CLUSTER = ?" + + " AND TYPE = ? AND FROM_TIME = ? AND TO_TIME = ? AND STATUS IN ( ?, ?)"; Review Comment: There are only 2 possible status so does it make sense to set them in the query ? If you don't then you are only querying pk columns without any filter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
