tkhurana commented on code in PR #2379: URL: https://github.com/apache/phoenix/pull/2379#discussion_r2985132197
########## phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java: ########## @@ -0,0 +1,723 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES; + +import java.io.IOException; +import java.security.MessageDigest; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.MetaDataUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.SHA256DigestUtil; +import org.apache.phoenix.util.ScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Mapper that acts as a driver for validating table data between source and target clusters. The + * actual work of chunking and hashing is done server-side by the coprocessor. This mapper fetches + * chunk hashes from both clusters, compares them and write to checkpoint table. + */ +public class PhoenixSyncTableMapper + extends Mapper<NullWritable, DBInputFormat.NullDBWritable, NullWritable, NullWritable> { + + private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixSyncTableMapper.class); + + public enum SyncCounters { + CHUNKS_VERIFIED, + CHUNKS_MISMATCHED, + SOURCE_ROWS_PROCESSED, + TARGET_ROWS_PROCESSED + } + + private String tableName; + private String targetZkQuorum; + private Long fromTime; + private Long toTime; + private boolean isDryRun; + private long chunkSizeBytes; + private Configuration conf; + private Connection sourceConnection; + private Connection targetConnection; + private Connection globalConnection; + private PTable pTable; + private byte[] physicalTableName; + private byte[] mapperRegionStart; + private byte[] mapperRegionEnd; + private PhoenixSyncTableOutputRepository syncTableOutputRepository; + private Timestamp mapperStartTime; + + @Override + protected void setup(Context context) throws InterruptedException { + try { + super.setup(context); + mapperStartTime = new Timestamp(System.currentTimeMillis()); + this.conf = context.getConfiguration(); + tableName = PhoenixConfigurationUtil.getPhoenixSyncTableName(conf); + targetZkQuorum = PhoenixConfigurationUtil.getPhoenixSyncTableTargetZkQuorum(conf); + fromTime = PhoenixConfigurationUtil.getPhoenixSyncTableFromTime(conf); + toTime = PhoenixConfigurationUtil.getPhoenixSyncTableToTime(conf); + isDryRun = PhoenixConfigurationUtil.getPhoenixSyncTableDryRun(conf); + chunkSizeBytes = PhoenixConfigurationUtil.getPhoenixSyncTableChunkSizeBytes(conf); + extractRegionBoundariesFromSplit(context); + sourceConnection = ConnectionUtil.getInputConnection(conf); + pTable = sourceConnection.unwrap(PhoenixConnection.class).getTable(tableName); + physicalTableName = pTable.getPhysicalName().getBytes(); + connectToTargetCluster(); + globalConnection = createGlobalConnection(conf); + syncTableOutputRepository = new PhoenixSyncTableOutputRepository(globalConnection); + } catch (SQLException | IOException e) { + tryClosingResources(); + throw new RuntimeException( + String.format("Failed to setup PhoenixSyncTableMapper for table: %s", tableName), e); + } + } + + /** + * Extracts mapper region boundaries from the PhoenixInputSplit + */ + private void extractRegionBoundariesFromSplit(Context context) { + PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit(); + KeyRange keyRange = split.getKeyRange(); + if (keyRange == null) { + throw new IllegalStateException(String.format( + "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine region boundaries for sync operation.", + tableName)); + } + mapperRegionStart = keyRange.getLowerRange(); + mapperRegionEnd = keyRange.getUpperRange(); + } + + /** + * Connects to the target cluster using the target ZK quorum, port, znode, krb principal + */ + private void connectToTargetCluster() throws SQLException, IOException { + Configuration targetConf = + PhoenixMapReduceUtil.createConfigurationForZkQuorum(conf, targetZkQuorum); + targetConnection = ConnectionUtil.getInputConnection(targetConf); + } + + /** + * Creates a global (non-tenant) connection for the checkpoint table. + */ + private Connection createGlobalConnection(Configuration conf) throws SQLException { + Configuration globalConf = new Configuration(conf); + globalConf.unset(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID); + globalConf.unset(PhoenixRuntime.CURRENT_SCN_ATTRIB); + return ConnectionUtil.getInputConnection(globalConf); + } + + /** + * Processes a mapper region by comparing chunks between source and target clusters. Gets already + * processed chunks from checkpoint table, resumes from check pointed progress and records final + * status for chunks & mapper (VERIFIED/MISMATCHED). + */ + @Override + protected void map(NullWritable key, DBInputFormat.NullDBWritable value, Context context) + throws IOException, InterruptedException { + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + try { + List<PhoenixSyncTableOutputRow> processedChunks = + syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum, fromTime, toTime, + mapperRegionStart, mapperRegionEnd); + List<Pair<byte[], byte[]>> unprocessedRanges = + calculateUnprocessedRanges(mapperRegionStart, mapperRegionEnd, processedChunks); + boolean isStartKeyInclusive = shouldStartKeyBeInclusive(mapperRegionStart, processedChunks); + for (Pair<byte[], byte[]> range : unprocessedRanges) { + processMapperRanges(range.getFirst(), range.getSecond(), isStartKeyInclusive, context); + isStartKeyInclusive = false; + } + + long mismatchedChunk = context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue(); + long verifiedChunk = context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue(); + long sourceRowsProcessed = context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue(); + long targetRowsProcessed = context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue(); + Timestamp mapperEndTime = new Timestamp(System.currentTimeMillis()); + String counters = formatMapperCounters(verifiedChunk, mismatchedChunk, sourceRowsProcessed, + targetRowsProcessed); + + if (sourceRowsProcessed > 0) { + if (mismatchedChunk == 0) { + context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1); + syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum, + PhoenixSyncTableOutputRow.Type.MAPPER_REGION, fromTime, toTime, isDryRun, + mapperRegionStart, mapperRegionEnd, PhoenixSyncTableOutputRow.Status.VERIFIED, + mapperStartTime, mapperEndTime, counters); + LOGGER.info( + "PhoenixSyncTable mapper completed with verified: {} verified chunks, {} mismatched chunks", + verifiedChunk, mismatchedChunk); + } else { + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + LOGGER.warn( + "PhoenixSyncTable mapper completed with mismatch: {} verified chunks, {} mismatched chunks", + verifiedChunk, mismatchedChunk); + syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum, + PhoenixSyncTableOutputRow.Type.MAPPER_REGION, fromTime, toTime, isDryRun, + mapperRegionStart, mapperRegionEnd, PhoenixSyncTableOutputRow.Status.MISMATCHED, + mapperStartTime, mapperEndTime, counters); + } + } else { + LOGGER.info( + "No rows pending to process. All mapper region boundaries are covered for startKey:{}, endKey: {}", + mapperRegionStart, mapperRegionEnd); + } + } catch (SQLException e) { + tryClosingResources(); + throw new RuntimeException("Error processing PhoenixSyncTableMapper", e); + } + } + + /** + * Processes a chunk range by comparing source and target cluster data. Source chunking: Breaks + * data into size-based chunks within given mapper region boundary. Target chunking: Follows + * source chunk boundaries. Source chunk boundary might be split across multiple target region, if + * so corpoc signals for partial chunk with partial digest. Once entire Source chunk is covered by + * target scanner, we calculate resulting checksum from combined digest. + * @param rangeStart Range start key + * @param rangeEnd Range end key + * @param isSourceStartKeyInclusive Whether startKey be inclusive for source chunking + * @param context Mapper context for progress and counters + * @throws IOException if scan fails + * @throws SQLException if database operations fail + */ + private void processMapperRanges(byte[] rangeStart, byte[] rangeEnd, + boolean isSourceStartKeyInclusive, Context context) throws IOException, SQLException { + // To handle scenario of target having extra keys compared to source keys: + // For every source chunk, we track whether its first chunk of Region or whether its lastChunk + // of region + // For every source chunk, we issue scan on target with + // - FirstChunkOfRegion : target scan start boundary would be rangeStart + // - LastChunkOfRegion : target scan end boundary would be rangeEnd + // - notFirstChunkOfRegion: target scan start boundary would be previous source chunk endKey + // - notLastChunkOfRegion: target scan end boundary would be current source chunk endKey + // Lets understand with an example. + // Source region boundary is [c,n) and source chunk returns [c1,d] , here `c` key is not present + // in source + // It could be the case that target has `c` present, so we issue scan on target chunk with + // startKey as `c` and not `c1` i.e [c,d] + // Similarly, if two consecutive source chunk returns its boundary as [e,g] and [h,j] + // When target is scanning for [h,j], it would issue scan with (g,j] to ensure we cover any + // extra key which is not in source but present in target + // + // Now eventually when chunking will reach for last source chunk on this region boundary, we + // again pass rangeEnd(with Exclusive) as target chunk boundary. + // Lets say, for above region boundary example second last and last sourceChunk returns [j,k] + // and [l,m]. Target chunk would issue scan for last chunk (k,n) + boolean isLastChunkOfRegion = false; + // We only want target startKey to be inclusive if source startKey is inclusive as well + // Source start key won't be inclusive if start of region boundary is already processed as chunk + // and check pointed + // Refer to shouldStartKeyBeInclusive() method to understand more about when source start key + // would be exclusive + boolean isTargetStartKeyInclusive = isSourceStartKeyInclusive; + try (ChunkScannerContext sourceScanner = createChunkScanner(sourceConnection, rangeStart, + rangeEnd, null, isSourceStartKeyInclusive, false, false)) { + ChunkInfo previousSourceChunk = null; + ChunkInfo sourceChunk = sourceScanner.getNextChunk(); + while (sourceChunk != null) { + sourceChunk.executionStartTime = new Timestamp(System.currentTimeMillis()); + // Peek ahead to see if this is the last chunk + ChunkInfo nextSourceChunk = sourceScanner.getNextChunk(); + if (nextSourceChunk == null) { + isLastChunkOfRegion = true; + } + ChunkInfo targetChunk = getTargetChunkWithSourceBoundary(targetConnection, + previousSourceChunk == null ? rangeStart : previousSourceChunk.endKey, + isLastChunkOfRegion ? rangeEnd : sourceChunk.endKey, isTargetStartKeyInclusive, + !isLastChunkOfRegion); + context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).increment(sourceChunk.rowCount); + context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).increment(targetChunk.rowCount); + boolean matched = MessageDigest.isEqual(sourceChunk.hash, targetChunk.hash); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "isSourceStartKeyInclusive: {}, isTargetStartKeyInclusive: {}," + + "isTargetEndKeyInclusive: {}, isFirstChunkOfRegion: {}, isLastChunkOfRegion: {}." + + "Chunk comparison source {}, {}. Key range passed to target chunk: {}, {}." + + "target chunk returned {}, {}: source={} rows, target={} rows, matched={}", + isSourceStartKeyInclusive, isTargetStartKeyInclusive, !isLastChunkOfRegion, + previousSourceChunk == null, isLastChunkOfRegion, + Bytes.toStringBinary(sourceChunk.startKey), Bytes.toStringBinary(sourceChunk.endKey), + Bytes.toStringBinary( + previousSourceChunk == null ? rangeStart : previousSourceChunk.endKey), + Bytes.toStringBinary(isLastChunkOfRegion ? rangeEnd : sourceChunk.endKey), + Bytes.toStringBinary(targetChunk.startKey), Bytes.toStringBinary(targetChunk.endKey), + sourceChunk.rowCount, targetChunk.rowCount, matched); + } + sourceChunk.executionEndTime = new Timestamp(System.currentTimeMillis()); + String counters = formatChunkCounters(sourceChunk.rowCount, targetChunk.rowCount); + if (matched) { + handleVerifiedChunk(sourceChunk, context, counters); + } else { + handleMismatchedChunk(sourceChunk, context, counters); + } + previousSourceChunk = sourceChunk; + sourceChunk = nextSourceChunk; + // After first chunk, our target chunk boundary would be previousSourceChunk.endKey, + // so start key should not be inclusive + isTargetStartKeyInclusive = false; + context.progress(); + } + } + LOGGER.info("Completed sync table processing of Mapper region boundary {}, {}", + Bytes.toStringBinary(rangeStart), Bytes.toStringBinary(rangeEnd)); + } + + /** + * Scans target across multiple regions and returns a single combined ChunkInfo. Handles partial + * chunks by passing digest state to next scanner via scan attributes, enabling cross-region + * digest continuation. Since we are scanning rows based on source chunk boundary, it could be + * distributed across multiple target regions. We keep on creating scanner across target region + * until entire source chunk boundary is processed or chunk is null + * @param conn Target connection + * @param startKey Source chunk start key + * @param endKey Source chunk end key + * @return Single ChunkInfo with final hash from all target regions + */ + private ChunkInfo getTargetChunkWithSourceBoundary(Connection conn, byte[] startKey, + byte[] endKey, boolean isTargetStartKeyInclusive, boolean isTargetEndKeyInclusive) + throws IOException, SQLException { + ChunkInfo combinedTargetChunk = new ChunkInfo(); + combinedTargetChunk.startKey = null; + combinedTargetChunk.endKey = null; + combinedTargetChunk.hash = null; + combinedTargetChunk.rowCount = 0; + byte[] currentStartKey = startKey; + byte[] continuedDigestState = null; + ChunkInfo chunk; + while (true) { + // Each iteration scans one target region. The coprocessor processes all rows in + // that region within the scan range. For target boundary, the chunk is always + // marked partial and the digest state is passed to the next + // scanner for cross-region hash continuation. + try (ChunkScannerContext scanner = createChunkScanner(conn, currentStartKey, endKey, + continuedDigestState, isTargetStartKeyInclusive, isTargetEndKeyInclusive, true)) { + chunk = scanner.getNextChunk(); + // chunk == null means no more rows in the target range. + // We must finalize the digest to produce a proper checksum for comparison. + if (chunk == null) { + if (continuedDigestState != null) { + combinedTargetChunk.hash = + SHA256DigestUtil.finalizeDigestToChecksum(continuedDigestState); + } + break; + } + if (combinedTargetChunk.startKey == null) { + combinedTargetChunk.startKey = chunk.startKey; + } + combinedTargetChunk.endKey = chunk.endKey; + combinedTargetChunk.rowCount += chunk.rowCount; + continuedDigestState = chunk.hash; + currentStartKey = chunk.endKey; + isTargetStartKeyInclusive = false; + } + } + return combinedTargetChunk; + } + + /** + * Creates a reusable scanner context for fetching chunks from a range. + * @param conn Connection to cluster (source or target) + * @param startKey Range start key (inclusive) + * @param endKey Range end key (exclusive) + * @param continuedDigestState If not null, coprocessor will continue hashing from this state (for + * cross-region continuation on target) + * @param isStartKeyInclusive Whether StartKey Inclusive + * @param isEndKeyInclusive Whether EndKey Inclusive + * @throws IOException scanner creation fails + * @throws SQLException hTable connection fails + */ + private ChunkScannerContext createChunkScanner(Connection conn, byte[] startKey, byte[] endKey, + byte[] continuedDigestState, boolean isStartKeyInclusive, boolean isEndKeyInclusive, + boolean isTargetScan) throws IOException, SQLException { + // Not using try-with-resources since ChunkScannerContext owns the table lifecycle + Table hTable = + conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(physicalTableName); + Scan scan = + createChunkScan(startKey, endKey, isStartKeyInclusive, isEndKeyInclusive, isTargetScan); + scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_FORMATION, TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserverConstants.SKIP_REGION_BOUNDARY_CHECK, TRUE_BYTES); + scan.setAttribute(BaseScannerRegionObserverConstants.UNGROUPED_AGG, TRUE_BYTES); + if (continuedDigestState != null && continuedDigestState.length > 0) { + scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CONTINUED_DIGEST_STATE, + continuedDigestState); + } + + if (!isTargetScan) { + scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES, + Bytes.toBytes(chunkSizeBytes)); + } + long syncTablePageTimeoutMs = (long) (conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, + QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT) * 0.5); + scan.setAttribute(BaseScannerRegionObserverConstants.SERVER_PAGE_SIZE_MS, + Bytes.toBytes(syncTablePageTimeoutMs)); + ResultScanner scanner = hTable.getScanner(scan); + return new ChunkScannerContext(hTable, scanner); + } + + /** + * Parses chunk information from the coprocessor result. The PhoenixSyncTableRegionScanner returns + * cells with chunk metadata including SHA-256 hash (for complete chunks) or MessageDigest state + * (for partial chunks). + */ + private ChunkInfo parseChunkInfo(Result result) { + List<Cell> cells = Arrays.asList(result.rawCells()); + Cell startKeyCell = MetaDataUtil.getCell(cells, + BaseScannerRegionObserverConstants.SYNC_TABLE_START_KEY_QUALIFIER); + Cell rowCountCell = MetaDataUtil.getCell(cells, + BaseScannerRegionObserverConstants.SYNC_TABLE_ROW_COUNT_QUALIFIER); + Cell isPartialChunkCell = MetaDataUtil.getCell(cells, + BaseScannerRegionObserverConstants.SYNC_TABLE_IS_PARTIAL_CHUNK_QUALIFIER); + Cell hashCell = + MetaDataUtil.getCell(cells, BaseScannerRegionObserverConstants.SYNC_TABLE_HASH_QUALIFIER); + + if ( + startKeyCell == null || rowCountCell == null || isPartialChunkCell == null || hashCell == null + ) { + throw new RuntimeException("Missing required chunk metadata cells."); + } + + ChunkInfo info = new ChunkInfo(); + info.startKey = CellUtil.cloneValue(startKeyCell); + info.endKey = result.getRow(); + info.rowCount = Bytes.toLong(rowCountCell.getValueArray(), rowCountCell.getValueOffset(), + rowCountCell.getValueLength()); + info.isPartial = isPartialChunkCell.getValueArray()[isPartialChunkCell.getValueOffset()] != 0; + info.hash = CellUtil.cloneValue(hashCell); + return info; + } + + /** + * Formats chunk counters as a comma-separated string. + * @param sourceRows Source rows processed + * @param targetRows Target rows processed + * @return Formatted string: "SOURCE_ROWS_PROCESSED=123,TARGET_ROWS_PROCESSED=456" + */ + private String formatChunkCounters(long sourceRows, long targetRows) { + return String.format("%s=%d,%s=%d", SyncCounters.SOURCE_ROWS_PROCESSED.name(), sourceRows, + SyncCounters.TARGET_ROWS_PROCESSED.name(), targetRows); + } + + /** + * Formats mapper counters as a comma-separated string. + * @param chunksVerified Chunks verified count + * @param chunksMismatched Chunks mismatched count + * @param sourceRows Source rows processed + * @param targetRows Target rows processed + * @return Formatted string with all mapper counters + */ + private String formatMapperCounters(long chunksVerified, long chunksMismatched, long sourceRows, + long targetRows) { + return String.format("%s=%d,%s=%d,%s=%d,%s=%d", SyncCounters.CHUNKS_VERIFIED.name(), + chunksVerified, SyncCounters.CHUNKS_MISMATCHED.name(), chunksMismatched, + SyncCounters.SOURCE_ROWS_PROCESSED.name(), sourceRows, + SyncCounters.TARGET_ROWS_PROCESSED.name(), targetRows); + } + + private void handleVerifiedChunk(ChunkInfo sourceChunk, Context context, String counters) + throws SQLException { + syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum, + PhoenixSyncTableOutputRow.Type.CHUNK, fromTime, toTime, isDryRun, sourceChunk.startKey, + sourceChunk.endKey, PhoenixSyncTableOutputRow.Status.VERIFIED, sourceChunk.executionStartTime, + sourceChunk.executionEndTime, counters); + context.getCounter(SyncCounters.CHUNKS_VERIFIED).increment(1); + } + + private void handleMismatchedChunk(ChunkInfo sourceChunk, Context context, String counters) + throws SQLException { + LOGGER.warn("Chunk mismatch detected for table: {}, with startKey: {}, endKey {}", tableName, + Bytes.toStringBinary(sourceChunk.startKey), Bytes.toStringBinary(sourceChunk.endKey)); + syncTableOutputRepository.checkpointSyncTableResult(tableName, targetZkQuorum, + PhoenixSyncTableOutputRow.Type.CHUNK, fromTime, toTime, isDryRun, sourceChunk.startKey, + sourceChunk.endKey, PhoenixSyncTableOutputRow.Status.MISMATCHED, + sourceChunk.executionStartTime, sourceChunk.executionEndTime, counters); + + context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1); + } + + /** + * Creates a Hbase raw scan for a chunk range to capture all cell versions and delete markers. + */ + private Scan createChunkScan(byte[] startKey, byte[] endKey, boolean isStartKeyInclusive, + boolean isEndKeyInclusive, boolean isTargetScan) throws IOException { + Scan scan = new Scan(); + scan.withStartRow(startKey, isStartKeyInclusive); + scan.withStopRow(endKey, isEndKeyInclusive); + scan.setRaw(true); Review Comment: Are we sure we have to do raw scan ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
