This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 26444e0ccc PHOENIX-7677 TTL_DELETE CDC event to use batch mutation
(#2247)
26444e0ccc is described below
commit 26444e0cccb4bf13fa95ec130ecf07b7573fe635
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue Jul 29 21:51:39 2025 -0700
PHOENIX-7677 TTL_DELETE CDC event to use batch mutation (#2247)
---
.../org/apache/phoenix/query/QueryServices.java | 6 +
.../apache/phoenix/query/QueryServicesOptions.java | 8 +-
.../phoenix/coprocessor/CDCCompactionUtil.java | 452 ++++++++++++---------
.../phoenix/coprocessor/CompactionScanner.java | 36 +-
.../org/apache/phoenix/end2end/TableTTLIT.java | 126 ++++++
.../phoenix/schema/ConditionalTTLExpressionIT.java | 20 +
.../java/org/apache/phoenix/util/TestUtil.java | 13 +-
7 files changed, 454 insertions(+), 207 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index 94f8574361..e3f494897d 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -503,6 +503,12 @@ public interface QueryServices extends SQLCloseable {
// CDC TTL mutation retry configuration
String CDC_TTL_MUTATION_MAX_RETRIES = "phoenix.cdc.ttl.mutation.max.retries";
+ // CDC TTL mutation batch size configuration
+ String CDC_TTL_MUTATION_BATCH_SIZE = "phoenix.cdc.ttl.mutation.batch.size";
+
+ // CDC TTL shared cache expiration time in seconds
+ String CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS =
"phoenix.cdc.ttl.shared.cache.expiry.seconds";
+
// This config is used to move (copy and delete) the child links from the
SYSTEM.CATALOG to
// SYSTEM.CHILD_LINK table.
// As opposed to a copy and async (out of band) delete.
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 16c1c28709..f6f44c23c1 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,7 +24,9 @@ import static
org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE
import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
import static
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
import static
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_BATCH_SIZE;
import static
org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_MAX_RETRIES;
+import static
org.apache.phoenix.query.QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS;
import static
org.apache.phoenix.query.QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD;
import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
import static
org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
@@ -493,6 +495,8 @@ public class QueryServicesOptions {
public static final Boolean
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = false;
public static final int DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES = 5;
+ public static final int DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE = 50;
+ public static final int DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS = 1200;
public static final long
DEFAULT_PHOENIX_CDC_STREAM_PARTITION_EXPIRY_MIN_AGE_MS =
30 * 60 * 60 * 1000; // 30 hours
@@ -613,7 +617,9 @@ public class QueryServicesOptions {
.setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)
.setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED,
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)
- .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES,
DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES);
+ .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES,
DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES)
+ .setIfUnset(CDC_TTL_MUTATION_BATCH_SIZE,
DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE)
+ .setIfUnset(CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS,
DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
index d078a3fae6..9921284591 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
@@ -20,18 +20,17 @@ package org.apache.phoenix.coprocessor;
import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.CheckAndMutate;
-import org.apache.hadoop.hbase.client.CheckAndMutateResult;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -43,6 +42,8 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PDate;
@@ -53,19 +54,229 @@ import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
+import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
+
/**
* Utility class for CDC (Change Data Capture) operations during compaction.
This class contains
- * utilities for handling TTL row expiration events and generating CDC events
with pre-image data
- * that are written directly to CDC index tables.
+ * utilities for handling TTL row expiration events and generating CDC events
with pre-image data.
+ * CDC mutations are accumulated during compaction and written to CDC index
tables in batches only
+ * when compaction completes.
*/
public final class CDCCompactionUtil {
private static final Logger LOGGER =
LoggerFactory.getLogger(CDCCompactionUtil.class);
+ // Shared cache for row images across all CompactionScanner instances in the
JVM.
+ // Entries expire after 1200 seconds (20 minutes) by default.
+ // The JVM level cache helps merge the pre-image for the row with multiple
CFs.
+ // The key of the cache contains (regionId + data table rowkey).
+ // The value contains pre-image that needs to be directly inserted in the
CDC index.
+ private static volatile Cache<ImmutableBytesPtr, Map<String, Object>>
sharedTtlImageCache;
+
private CDCCompactionUtil() {
// empty
}
+ /**
+ * Gets the shared row image cache, initializing it lazily with
configuration.
+ * @param config The Hadoop configuration to read cache expiry from
+ * @return the shared cache instance
+ */
+ static Cache<ImmutableBytesPtr, Map<String, Object>>
+ getSharedRowImageCache(Configuration config) {
+ if (sharedTtlImageCache == null) {
+ synchronized (CDCCompactionUtil.class) {
+ if (sharedTtlImageCache == null) {
+ int expirySeconds =
config.getInt(QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS,
+ QueryServicesOptions.DEFAULT_CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS);
+ sharedTtlImageCache =
+ CacheBuilder.newBuilder().expireAfterWrite(expirySeconds,
TimeUnit.SECONDS).build();
+ LOGGER.info("Initialized shared CDC row image cache with expiry of
{} seconds",
+ expirySeconds);
+ }
+ }
+ }
+ return sharedTtlImageCache;
+ }
+
+ /**
+ * Batch processor for CDC mutations during compaction. This class
accumulates all mutations
+ * during the compaction operation and writes them to the CDC index in
batches only when the
+ * compaction is complete.
+ */
+ public static class CDCBatchProcessor {
+
+ private final Map<ImmutableBytesPtr, Put> pendingMutations;
+ private final PTable cdcIndex;
+ private final PTable dataTable;
+ private final RegionCoprocessorEnvironment env;
+ private final Region region;
+ private final byte[] compactionTimeBytes;
+ private final long eventTimestamp;
+ private final String tableName;
+ private final int cdcTtlMutationMaxRetries;
+ private final int batchSize;
+ private final Configuration config;
+
+ public CDCBatchProcessor(PTable cdcIndex, PTable dataTable,
RegionCoprocessorEnvironment env,
+ Region region, byte[] compactionTimeBytes, long eventTimestamp, String
tableName,
+ int cdcTtlMutationMaxRetries, int batchSize) {
+ this.pendingMutations = new HashMap<>();
+ this.cdcIndex = cdcIndex;
+ this.dataTable = dataTable;
+ this.env = env;
+ this.region = region;
+ this.compactionTimeBytes = compactionTimeBytes;
+ this.eventTimestamp = eventTimestamp;
+ this.tableName = tableName;
+ this.cdcTtlMutationMaxRetries = cdcTtlMutationMaxRetries;
+ this.batchSize = batchSize;
+ this.config = env.getConfiguration();
+ }
+
+ /**
+ * Adds a CDC event for the specified expired row. If the row already
exists in memory, merges
+ * the image with the existing image. Accumulates mutations in memory for
batch processing
+ * during close() instead of immediately writing to the CDC index.
+ * @param expiredRow The expired row.
+ * @throws Exception If something goes wrong.
+ */
+ public void addCDCEvent(List<Cell> expiredRow) throws Exception {
+ Cell firstCell = expiredRow.get(0);
+ byte[] dataRowKey = CellUtil.cloneRow(firstCell);
+
+ Put expiredRowPut = new Put(dataRowKey);
+ for (Cell cell : expiredRow) {
+ expiredRowPut.add(cell);
+ }
+
+ IndexMaintainer cdcIndexMaintainer;
+ // rowKey for the Index mutation
+ byte[] rowKey;
+ try (PhoenixConnection serverConnection =
+ QueryUtil.getConnectionOnServer(new Properties(),
env.getConfiguration())
+ .unwrap(PhoenixConnection.class)) {
+ cdcIndexMaintainer = cdcIndex.getIndexMaintainer(dataTable,
serverConnection);
+
+ ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut);
+ ImmutableBytesPtr rowKeyPtr = new
ImmutableBytesPtr(expiredRowPut.getRow());
+
+ Put cdcIndexPut =
cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ dataRowVG, rowKeyPtr, eventTimestamp, null, null, false,
+ region.getRegionInfo().getEncodedNameAsBytes());
+
+ rowKey = cdcIndexPut.getRow().clone();
+ System.arraycopy(compactionTimeBytes, 0, rowKey,
PartitionIdFunction.PARTITION_ID_LENGTH,
+ PDate.INSTANCE.getByteSize());
+ }
+
+ byte[] rowKeyWithoutTimestamp = new byte[rowKey.length -
PDate.INSTANCE.getByteSize()];
+ // copy PARTITION_ID() from offset 0 to 31
+ System.arraycopy(rowKey, 0, rowKeyWithoutTimestamp, 0,
+ PartitionIdFunction.PARTITION_ID_LENGTH);
+ // copy data table rowkey from offset (32 + 8) to end of rowkey
+ System.arraycopy(rowKey,
+ PartitionIdFunction.PARTITION_ID_LENGTH + PDate.INSTANCE.getByteSize(),
+ rowKeyWithoutTimestamp, PartitionIdFunction.PARTITION_ID_LENGTH,
+ rowKeyWithoutTimestamp.length -
PartitionIdFunction.PARTITION_ID_LENGTH);
+ ImmutableBytesPtr cacheKeyPtr = new
ImmutableBytesPtr(rowKeyWithoutTimestamp);
+
+ // Check if we already have an image for this row in the shared cache,
from other store
+ // compaction of the same region
+ Cache<ImmutableBytesPtr, Map<String, Object>> cache =
getSharedRowImageCache(config);
+ Map<String, Object> existingPreImage = cache.getIfPresent(cacheKeyPtr);
+ if (existingPreImage == null) {
+ existingPreImage = new HashMap<>();
+ cache.put(cacheKeyPtr, existingPreImage);
+ }
+
+ // Create CDC event with merged pre-image
+ Map<String, Object> cdcEvent =
+ createTTLDeleteCDCEvent(expiredRowPut, dataTable, existingPreImage);
+ byte[] cdcEventBytes =
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent);
+ Put cdcIndexPut = buildCDCIndexPut(eventTimestamp, cdcEventBytes,
rowKey, cdcIndexMaintainer);
+
+ pendingMutations.put(cacheKeyPtr, cdcIndexPut);
+ }
+
+ /**
+ * Flushes a specific list of mutations to the CDC index table.
+ * @param mutations List of mutations to flush
+ */
+ private void flushMutations(List<Put> mutations) throws Exception {
+ if (mutations.isEmpty()) {
+ return;
+ }
+
+ Exception lastException = null;
+ for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries;
retryCount++) {
+ try (Table cdcIndexTable =
+
env.getConnection().getTable(TableName.valueOf(cdcIndex.getPhysicalName().getBytes())))
{
+ cdcIndexTable.put(mutations);
+ lastException = null;
+ LOGGER.debug("Successfully flushed batch of {} CDC mutations for
table {}",
+ mutations.size(), tableName);
+ break;
+ } catch (Exception e) {
+ lastException = e;
+ long backoffMs = 100;
+ LOGGER.warn("CDC batch mutation attempt {}/{} failed, retrying in
{}ms. Batch size: {}",
+ retryCount + 1, cdcTtlMutationMaxRetries, backoffMs,
mutations.size(), e);
+ try {
+ Thread.sleep(backoffMs);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted during CDC batch mutation
retry", ie);
+ }
+ }
+ }
+
+ if (lastException != null) {
+ LOGGER.error(
+ "Failed to flush CDC batch after {} attempts for table {}, index {}.
{} "
+ + "events are missed.",
+ cdcTtlMutationMaxRetries, tableName,
cdcIndex.getPhysicalName().getString(),
+ mutations.size(), lastException);
+ }
+ }
+
+ /**
+ * Finalizes the batch processor by flushing all accumulated mutations in
batches. This method
+ * processes all accumulated mutations and writes them to the CDC index in
batches of the
+ * configured batch size.
+ */
+ public void close() throws Exception {
+ if (pendingMutations.isEmpty()) {
+ LOGGER.trace("No CDC mutations to flush for table {}", tableName);
+ return;
+ }
+
+ int totalMutations = pendingMutations.size();
+ LOGGER.info("Flushing {} accumulated CDC mutations for table {} in
batches of {}",
+ totalMutations, tableName, batchSize);
+
+ List<Put> allMutations = new ArrayList<>(pendingMutations.values());
+
+ for (int i = 0; i < allMutations.size(); i += batchSize) {
+ int endIndex = Math.min(i + batchSize, allMutations.size());
+ List<Put> batch = allMutations.subList(i, endIndex);
+ flushMutations(batch);
+ LOGGER.debug("Flushed CDC batch {}/{} for table {} (mutations {}-{} of
{})",
+ (i / batchSize) + 1, (allMutations.size() + batchSize - 1) /
batchSize, tableName, i + 1,
+ endIndex, totalMutations);
+ }
+
+ pendingMutations.clear();
+
+ Cache<ImmutableBytesPtr, Map<String, Object>> cache =
getSharedRowImageCache(config);
+ LOGGER.info(
+ "CDC batch processor closed for table {}. Processed {} mutations in {}
batches."
+ + " Shared cache size: {}",
+ tableName, totalMutations, (totalMutations + batchSize - 1) /
batchSize, cache.size());
+ }
+ }
+
/**
* Finds the column name for a given cell in the data table.
* @param dataTable The data table
@@ -128,223 +339,80 @@ public final class CDCCompactionUtil {
/**
* Builds CDC index Put mutation.
- * @param cdcIndex The CDC index table
- * @param expiredRowPut The expired row data as a Put
- * @param eventTimestamp The timestamp for the CDC event
- * @param cdcEventBytes The CDC event data to store
- * @param dataTable The data table
- * @param env The region coprocessor environment
- * @param region The HBase region
- * @param compactionTimeBytes The compaction time as bytes
+ * @param eventTimestamp The timestamp for the CDC event
+ * @param cdcEventBytes The CDC event data to store
+ * @param rowKey The rowKey of the CDC index mutation
+ * @param cdcIndexMaintainer The index maintainer object for the CDC index
* @return The CDC index Put mutation
*/
- private static Put buildCDCIndexPut(PTable cdcIndex, Put expiredRowPut, long
eventTimestamp,
- byte[] cdcEventBytes, PTable dataTable, RegionCoprocessorEnvironment env,
Region region,
- byte[] compactionTimeBytes) throws Exception {
+ private static Put buildCDCIndexPut(long eventTimestamp, byte[]
cdcEventBytes, byte[] rowKey,
+ IndexMaintainer cdcIndexMaintainer) {
- try (PhoenixConnection serverConnection =
- QueryUtil.getConnectionOnServer(new Properties(), env.getConfiguration())
- .unwrap(PhoenixConnection.class)) {
+ Put newCdcIndexPut = new Put(rowKey, eventTimestamp);
- IndexMaintainer cdcIndexMaintainer =
cdcIndex.getIndexMaintainer(dataTable, serverConnection);
+
newCdcIndexPut.addColumn(cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ cdcIndexMaintainer.getEmptyKeyValueQualifier(), eventTimestamp,
+ QueryConstants.UNVERIFIED_BYTES);
- ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut);
- ImmutableBytesPtr rowKeyPtr = new
ImmutableBytesPtr(expiredRowPut.getRow());
+ // Add CDC event data
+ newCdcIndexPut.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.CDC_IMAGE_CQ_BYTES, eventTimestamp, cdcEventBytes);
- Put cdcIndexPut =
cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
- dataRowVG, rowKeyPtr, eventTimestamp, null, null, false,
- region.getRegionInfo().getEncodedNameAsBytes());
-
- byte[] rowKey = cdcIndexPut.getRow().clone();
- System.arraycopy(compactionTimeBytes, 0, rowKey,
PartitionIdFunction.PARTITION_ID_LENGTH,
- PDate.INSTANCE.getByteSize());
- Put newCdcIndexPut = new Put(rowKey, eventTimestamp);
-
-
newCdcIndexPut.addColumn(cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
- cdcIndexMaintainer.getEmptyKeyValueQualifier(), eventTimestamp,
- QueryConstants.UNVERIFIED_BYTES);
-
- // Add CDC event data
- newCdcIndexPut.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- QueryConstants.CDC_IMAGE_CQ_BYTES, eventTimestamp, cdcEventBytes);
-
- return newCdcIndexPut;
- }
+ return newCdcIndexPut;
}
/**
- * Generates and applies a CDC index mutation for TTL expired row with
retries if required.
- * @param cdcIndex The CDC index table
+ * Creates a CDC batch processor for the given data table and configuration.
* @param dataTable The data table
- * @param expiredRowPut The expired row data as a Put
- * @param eventTimestamp The timestamp for the CDC event
- * @param tableName The table name for logging
* @param env The region coprocessor environment
* @param region The HBase region
* @param compactionTimeBytes The compaction time as bytes
- * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
- */
- private static void generateCDCIndexMutation(PTable cdcIndex, PTable
dataTable, Put expiredRowPut,
- long eventTimestamp, String tableName, RegionCoprocessorEnvironment env,
Region region,
- byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) throws Exception
{
- Map<String, Object> cdcEvent =
- createTTLDeleteCDCEvent(expiredRowPut, dataTable, new HashMap<>());
- byte[] cdcEventBytes =
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent);
- Put cdcIndexPut = buildCDCIndexPut(cdcIndex, expiredRowPut,
eventTimestamp, cdcEventBytes,
- dataTable, env, region, compactionTimeBytes);
-
- Exception lastException = null;
- for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries;
retryCount++) {
- try (Table cdcIndexTable =
-
env.getConnection().getTable(TableName.valueOf(cdcIndex.getPhysicalName().getBytes())))
{
- CheckAndMutate checkAndMutate =
CheckAndMutate.newBuilder(cdcIndexPut.getRow())
- .ifNotExists(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- QueryConstants.CDC_IMAGE_CQ_BYTES)
- .build(cdcIndexPut);
- CheckAndMutateResult result =
cdcIndexTable.checkAndMutate(checkAndMutate);
-
- if (result.isSuccess()) {
- // Successfully inserted new CDC event - Single CF case
- lastException = null;
- break;
- } else {
- // Row already exists, need to retrieve existing pre-image and merge
- // Likely to happen for multi CF case
- Get get = new Get(cdcIndexPut.getRow());
- get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
- QueryConstants.CDC_IMAGE_CQ_BYTES);
- Result existingResult = cdcIndexTable.get(get);
-
- if (!existingResult.isEmpty()) {
- Cell existingCell = existingResult.getColumnLatestCell(
- QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
QueryConstants.CDC_IMAGE_CQ_BYTES);
-
- if (existingCell != null) {
- byte[] existingCdcBytes = CellUtil.cloneValue(existingCell);
- Map<String, Object> existingCdcEvent =
-
JacksonUtil.getObjectReader(HashMap.class).readValue(existingCdcBytes);
- Map<String, Object> existingPreImage = (Map<String, Object>)
existingCdcEvent
- .getOrDefault(QueryConstants.CDC_PRE_IMAGE, new HashMap<>());
-
- // Create new TTL delete event with merged pre-image
- Map<String, Object> mergedCdcEvent =
- createTTLDeleteCDCEvent(expiredRowPut, dataTable,
existingPreImage);
- byte[] mergedCdcEventBytes =
-
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(mergedCdcEvent);
-
- Put mergedCdcIndexPut = buildCDCIndexPut(cdcIndex,
expiredRowPut, eventTimestamp,
- mergedCdcEventBytes, dataTable, env, region,
compactionTimeBytes);
-
- cdcIndexTable.put(mergedCdcIndexPut);
- lastException = null;
- break;
- } else {
- LOGGER.warn("Rare event: Skipping CDC TTL mutation because other
type"
- + " of CDC event is recorded at time {}", eventTimestamp);
- break;
- }
- } else {
- LOGGER.warn("Rare event.. Skipping CDC TTL mutation because other
type"
- + " of CDC event is recorded at time {}", eventTimestamp);
- break;
- }
- }
- } catch (Exception e) {
- lastException = e;
- long backoffMs = 100;
- LOGGER.warn("CDC mutation attempt {}/{} failed, retrying in {}ms",
retryCount + 1,
- cdcTtlMutationMaxRetries + 1, backoffMs, e);
- try {
- Thread.sleep(backoffMs);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted during CDC mutation retry", ie);
- }
- }
- }
- if (lastException != null) {
- LOGGER.error(
- "Failed to generate CDC mutation after {} attempts for table {}, index
"
- + "{}. The event update is missed.",
- cdcTtlMutationMaxRetries + 1, tableName,
cdcIndex.getPhysicalName().getString(),
- lastException);
- }
- }
-
- /**
- * Generates CDC TTL delete event and writes it directly to CDC index
tables. This bypasses the
- * normal CDC update path since the row is being expired.
- * @param expiredRow The cells of the expired row
- * @param tableName The table name for logging
* @param compactionTime The compaction timestamp
- * @param dataTable The data table
- * @param env The region coprocessor environment
- * @param region The HBase region
- * @param compactionTimeBytes The compaction time as bytes
+ * @param tableName The table name for logging
* @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
+ * @param batchSize The batch size for CDC mutations
+ * @return CDCBatchProcessor instance or null if no active CDC index
*/
- private static void generateCDCTTLDeleteEvent(List<Cell> expiredRow, String
tableName,
- long compactionTime, PTable dataTable, RegionCoprocessorEnvironment env,
Region region,
- byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) {
- try {
- PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable);
- if (cdcIndex == null) {
- LOGGER.warn("No active CDC index found for table {}", tableName);
- return;
- }
- Cell firstCell = expiredRow.get(0);
- byte[] dataRowKey = CellUtil.cloneRow(firstCell);
- Put expiredRowPut = new Put(dataRowKey);
-
- for (Cell cell : expiredRow) {
- expiredRowPut.add(cell);
- }
-
- try {
- generateCDCIndexMutation(cdcIndex, dataTable, expiredRowPut,
compactionTime, tableName, env,
- region, compactionTimeBytes, cdcTtlMutationMaxRetries);
- } catch (Exception e) {
- LOGGER.error("Failed to generate CDC mutation for index {}: {}",
- cdcIndex.getName().getString(), e.getMessage(), e);
- }
- } catch (Exception e) {
- LOGGER.error("Error generating CDC TTL delete event for table {}",
tableName, e);
+ public static CDCBatchProcessor createBatchProcessor(PTable dataTable,
+ RegionCoprocessorEnvironment env, Region region, byte[]
compactionTimeBytes,
+ long compactionTime, String tableName, int cdcTtlMutationMaxRetries, int
batchSize) {
+ PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable);
+ if (cdcIndex == null) {
+ LOGGER.warn("No active CDC index found for table {}", tableName);
+ return null;
}
+ return new CDCBatchProcessor(cdcIndex, dataTable, env, region,
compactionTimeBytes,
+ compactionTime, tableName, cdcTtlMutationMaxRetries, batchSize);
}
/**
- * Handles TTL row expiration for CDC event generation. This method is
called when a row is
- * detected as expired during major compaction.
- * @param expiredRow The cells of the expired row
- * @param expirationType The type of TTL expiration
- * @param tableName The table name for logging purposes
- * @param compactionTime The timestamp when compaction started
- * @param table The Phoenix data table metadata
- * @param env The region coprocessor environment for
accessing HBase
- * resources
- * @param region The HBase region being compacted
- * @param compactionTimeBytes The compaction timestamp as byte array
for CDC index row key
- * construction
- * @param cdcTtlMutationMaxRetries Maximum number of retry attempts for CDC
mutation operations
+ * Handles TTL row expiration for CDC event generation using batch
processing. This method is
+ * called when a row is detected as expired during major compaction.
+ * @param expiredRow The cells of the expired row
+ * @param expirationType The type of TTL expiration
+ * @param tableName The table name for logging purposes
+ * @param batchProcessor The CDC batch processor instance
*/
static void handleTTLRowExpiration(List<Cell> expiredRow, String
expirationType, String tableName,
- long compactionTime, PTable table, RegionCoprocessorEnvironment env,
Region region,
- byte[] compactionTimeBytes, int cdcTtlMutationMaxRetries) {
+ CDCBatchProcessor batchProcessor) {
+ if (batchProcessor == null) {
+ return;
+ }
+
try {
Cell firstCell = expiredRow.get(0);
byte[] rowKey = CellUtil.cloneRow(firstCell);
- LOGGER.info(
+ LOGGER.debug(
"TTL row expiration detected: table={}, rowKey={}, expirationType={}, "
+ "cellCount={}, compactionTime={}",
- tableName, Bytes.toStringBinary(rowKey), expirationType,
expiredRow.size(), compactionTime);
-
- // Generate CDC TTL delete event with pre-image data
- generateCDCTTLDeleteEvent(expiredRow, tableName, compactionTime, table,
env, region,
- compactionTimeBytes, cdcTtlMutationMaxRetries);
+ tableName, Bytes.toStringBinary(rowKey), expirationType,
expiredRow.size(),
+ batchProcessor.eventTimestamp);
+ batchProcessor.addCDCEvent(expiredRow);
} catch (Exception e) {
LOGGER.error("Error handling TTL row expiration for CDC: table {}",
tableName, e);
}
}
+
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index c82a421e9b..f12dc77f72 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -164,6 +164,7 @@ public class CompactionScanner implements InternalScanner {
private final boolean isCdcTtlEnabled;
private final PTable table;
private final int cdcTtlMutationMaxRetries;
+ private CDCCompactionUtil.CDCBatchProcessor cdcBatchProcessor;
// Only for forcing minor compaction while testing
private static boolean forceMinorCompaction = false;
@@ -216,6 +217,15 @@ public class CompactionScanner implements InternalScanner {
env.getConfiguration().getInt(QueryServices.CDC_TTL_MUTATION_MAX_RETRIES,
QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES);
+ if (isCdcTtlEnabled) {
+ int cdcTtlMutationBatchSize =
+
env.getConfiguration().getInt(QueryServices.CDC_TTL_MUTATION_BATCH_SIZE,
+ QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_BATCH_SIZE);
+ cdcBatchProcessor =
+ CDCCompactionUtil.createBatchProcessor(table, env, region,
compactionTimeBytes,
+ compactionTime, tableName, cdcTtlMutationMaxRetries,
cdcTtlMutationBatchSize);
+ }
+
// Initialize the tracker that computes the TTL for the compacting table.
// The TTL tracker can be
// simple (one single TTL for the table) when the compacting table is not
Partitioned
@@ -414,9 +424,9 @@ public class CompactionScanner implements InternalScanner {
CompiledConditionalTTLExpression ttlExpr =
(CompiledConditionalTTLExpression) rowContext.ttlExprForRow;
if (ttlExpr.isExpired(result, true)) {
- if (isCdcTtlEnabled && !result.isEmpty()) {
+ if (isCdcTtlEnabled && cdcBatchProcessor != null && !result.isEmpty()) {
CDCCompactionUtil.handleTTLRowExpiration(result, "conditional_ttl",
tableName,
- compactionTime, table, env, region, compactionTimeBytes,
cdcTtlMutationMaxRetries);
+ cdcBatchProcessor);
}
// If the row is expired, purge the row
result.clear();
@@ -452,10 +462,23 @@ public class CompactionScanner implements InternalScanner
{
LOGGER.info("Closing CompactionScanner for table " + tableName + " store "
+ columnFamilyName
+ (major ? " major " : " not major ") + "compaction retained " +
outputCellCount + " of "
+ inputCellCount + " cells" + (phoenixLevelOnly ? " phoenix level only"
: ""));
+
if (forceMinorCompaction) {
forceMinorCompaction = false;
}
storeScanner.close();
+
+ // Flush any remaining CDC mutations in the batch
+ if (cdcBatchProcessor != null) {
+ try {
+ cdcBatchProcessor.close();
+ } catch (Exception e) {
+ LOGGER.error("Error closing CDC batch processor for table {}",
tableName, e);
+ throw new IOException("Failed to close CDC batch processor", e);
+ } finally {
+
CDCCompactionUtil.getSharedRowImageCache(env.getConfiguration()).cleanUp();
+ }
+ }
}
enum MatcherType {
@@ -2427,9 +2450,9 @@ public class CompactionScanner implements InternalScanner
{
// Only do this check for major compaction as for minor compactions we
don't expire cells.
// The row version should not be visible via the max lookback window.
Nothing to do
- if (isCdcTtlEnabled && !lastRow.isEmpty()) {
+ if (isCdcTtlEnabled && cdcBatchProcessor != null &&
!lastRow.isEmpty()) {
CDCCompactionUtil.handleTTLRowExpiration(lastRow, "time_based_ttl",
tableName,
- compactionTime, table, env, region, compactionTimeBytes,
cdcTtlMutationMaxRetries);
+ cdcBatchProcessor);
}
return;
}
@@ -2521,9 +2544,9 @@ public class CompactionScanner implements InternalScanner
{
// store is not the empty column family store.
return false;
}
- if (isCdcTtlEnabled && !lastRowVersion.isEmpty()) {
+ if (isCdcTtlEnabled && cdcBatchProcessor != null &&
!lastRowVersion.isEmpty()) {
CDCCompactionUtil.handleTTLRowExpiration(lastRowVersion,
"max_lookback_ttl", tableName,
- compactionTime, table, env, region, compactionTimeBytes,
cdcTtlMutationMaxRetries);
+ cdcBatchProcessor);
}
return true;
}
@@ -2575,7 +2598,6 @@ public class CompactionScanner implements InternalScanner
{
lastRowVersion.clear();
lastRowVersion.addAll(trimmedRow);
trimmedEmptyColumn.clear();
- ;
for (Cell cell : emptyColumn) {
if (cell.getTimestamp() >= minTimestamp) {
trimmedEmptyColumn.add(cell);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 97eecf2c42..b7f481d118 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -28,8 +28,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -782,6 +784,130 @@ public class TableTTLIT extends BaseTest {
TestUtil.majorCompact(getUtility(), table);
}
+ /**
+ * Test CDC batch mutations for TTL expired rows. This test creates a table
with TTL and CDC
+ * index, inserts 82 rows (to test batching: 25+25+25+7), lets them expire
via TTL, and verifies
+ * that all 82 rows have CDC TTL_DELETE events recorded with correct
pre-image data.
+ */
+ @Test
+ public void testCDCBatchMutationsForTTLExpiredRows() throws Exception {
+ final int maxLookbackAge =
+ tableLevelMaxLookback != null ? tableLevelMaxLookback : MAX_LOOKBACK_AGE;
+ final int numRows = 182;
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String tableName = generateUniqueName();
+ String cdcName = generateUniqueName();
+ ObjectMapper mapper = new ObjectMapper();
+
+ createTable(tableName);
+ conn.createStatement().execute("ALTER TABLE " + tableName
+ + " SET \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
+
+ String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE
(PRE, POST)";
+ conn.createStatement().execute(cdcSql);
+ conn.commit();
+
+ String cdcFullName = SchemaUtil.getTableName(null, cdcName);
+
+ long startTime = System.currentTimeMillis() + 1000;
+ startTime = (startTime / 1000) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTime);
+
+ // Track post-images for each row to verify against pre-images later
+ Map<String, Map<String, Object>> lastPostImages = new HashMap<>();
+
+ for (int i = 1; i <= numRows; i++) {
+ String rowId = "row" + i;
+ updateRow(conn, tableName, rowId);
+ injectEdge.incrementValue(100);
+ }
+
+ // Get the post-images from the UPSERT events
+ String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName;
+ try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery)) {
+ while (rs.next()) {
+ Map<String, Object> cdcEvent = mapper.readValue(rs.getString(3),
HashMap.class);
+ assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE,
+ cdcEvent.get(CDC_EVENT_TYPE));
+
+ Map<String, Object> postImage = (Map<String, Object>)
cdcEvent.get(CDC_POST_IMAGE);
+ String rowId = rs.getString(2);
+ lastPostImages.put(rowId, postImage);
+ }
+ }
+
+ assertEquals("Should have captured post-images for all " + numRows + "
rows", numRows,
+ lastPostImages.size());
+
+ // Advance time past TTL to expire all rows
+ injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000);
+
+ EnvironmentEdgeManager.reset();
+ flush(TableName.valueOf(tableName));
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+
+ Timestamp ts = new Timestamp(injectEdge.currentTime());
+ majorCompact(TableName.valueOf(tableName));
+
+ // Verify all rows are expired from data table
+ String dataQuery = "SELECT COUNT(*) FROM " + tableName;
+ try (ResultSet rs = conn.createStatement().executeQuery(dataQuery)) {
+ assertTrue("Should have count result", rs.next());
+ assertEquals("All rows should be expired from data table", 0,
rs.getInt(1));
+ }
+
+ // Verify all TTL_DELETE CDC events were generated
+ String ttlDeleteQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcFullName
+ + " WHERE PHOENIX_ROW_TIMESTAMP() >= ?";
+
+ Map<String, Map<String, Object>> ttlDeletePreImages = new HashMap<>();
+ int ttlDeleteEventCount = 0;
+
+ try (PreparedStatement pst = conn.prepareStatement(ttlDeleteQuery)) {
+ pst.setTimestamp(1, ts);
+
+ try (ResultSet rs = pst.executeQuery(ttlDeleteQuery)) {
+ while (rs.next()) {
+ ttlDeleteEventCount++;
+ Map<String, Object> cdcEvent = mapper.readValue(rs.getString(3),
HashMap.class);
+
+ assertEquals("Should be ttl_delete event",
CDC_TTL_DELETE_EVENT_TYPE,
+ cdcEvent.get(CDC_EVENT_TYPE));
+
+ assertTrue("TTL delete should have pre-image",
cdcEvent.containsKey(CDC_PRE_IMAGE));
+ Map<String, Object> preImage = (Map<String, Object>)
cdcEvent.get(CDC_PRE_IMAGE);
+ assertNotNull("Pre-image should not be null", preImage);
+ assertFalse("Pre-image should not be empty", preImage.isEmpty());
+
+ String rowId = rs.getString(2);
+ ttlDeletePreImages.put(rowId, preImage);
+ }
+ }
+ }
+
+ assertEquals("Should have exactly " + numRows + " TTL_DELETE events",
numRows,
+ ttlDeleteEventCount);
+ assertEquals("Should have pre-images for all " + numRows + " rows",
numRows,
+ ttlDeletePreImages.size());
+
+ // Verify that pre-images in TTL_DELETE events match the last
post-images from UPSERT events
+ for (String rowId : lastPostImages.keySet()) {
+ assertTrue("Should have TTL_DELETE pre-image for row " + rowId,
+ ttlDeletePreImages.containsKey(rowId));
+
+ Map<String, Object> lastPostImage = lastPostImages.get(rowId);
+ Map<String, Object> ttlDeletePreImage = ttlDeletePreImages.get(rowId);
+
+ assertEquals(
+ "Pre-image in TTL_DELETE should match last post-image from UPSERT
for row " + rowId,
+ lastPostImage, ttlDeletePreImage);
+ }
+
+ }
+ }
+
private void deleteRow(Connection conn, String tableName, String id) throws
SQLException {
String dml = "DELETE from " + tableName + " WHERE id = '" + id + "'";
conn.createStatement().executeUpdate(dml);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
index c16accfbe7..3cf200e46e 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
@@ -39,6 +39,8 @@ import static org.junit.Assert.fail;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -57,14 +59,17 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.phoenix.coprocessor.CDCCompactionUtil;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -101,6 +106,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
+import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@@ -158,6 +164,7 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
Integer.toString(0));
props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB,
Long.toString(0));
+ props.put(QueryServices.CDC_TTL_SHARED_CACHE_EXPIRY_SECONDS,
Integer.toString(1));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -543,6 +550,9 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
// Trigger TTL expiration again
injectEdge.incrementValue(ttl);
+
+ Thread.sleep(700);
+ cleanUpSharedTtlImageCache();
doMajorCompaction(tableName);
// Verify all rows are expired from data table
@@ -1470,6 +1480,16 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
}
}
+ private void cleanUpSharedTtlImageCache()
+ throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+ Method getSharedCacheMethod =
+ CDCCompactionUtil.class.getDeclaredMethod("getSharedRowImageCache",
Configuration.class);
+ getSharedCacheMethod.setAccessible(true);
+ Cache<ImmutableBytesPtr, Map<String, Object>> cache =
(Cache<ImmutableBytesPtr,
+ Map<String, Object>>) getSharedCacheMethod.invoke(null, (Configuration)
null);
+ cache.cleanUp();
+ }
+
private void doMajorCompaction(String tableName) throws IOException,
InterruptedException {
TestUtil.flush(getUtility(), TableName.valueOf(tableName));
TestUtil.majorCompact(getUtility(), TableName.valueOf(tableName));
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index e08a406af6..6541dd7338 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1653,13 +1653,12 @@ public class TestUtil {
TableOperation operation) throws Exception {
ConnectionQueryServices services =
conn.unwrap(PhoenixConnection.class).getQueryServices();
Configuration configuration = services.getConfiguration();
- org.apache.hadoop.hbase.client.Connection hbaseConn =
- ConnectionFactory.createConnection(configuration);
- Admin admin = services.getAdmin();
- RegionLocator regionLocator =
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
- int nRegions = regionLocator.getAllRegionLocations().size();
- operation.execute(admin, regionLocator, nRegions);
-
+ try (org.apache.hadoop.hbase.client.Connection hbaseConn =
+ ConnectionFactory.createConnection(configuration); Admin admin =
services.getAdmin()) {
+ RegionLocator regionLocator =
hbaseConn.getRegionLocator(TableName.valueOf(tableName));
+ int nRegions = regionLocator.getAllRegionLocations().size();
+ operation.execute(admin, regionLocator, nRegions);
+ }
}
private static void waitForRegionChange(RegionLocator regionLocator, int
initialRegionCount)