This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 8b506916d1 PHOENIX-7653 New CDC Event for TTL expired rows (#2209)
8b506916d1 is described below
commit 8b506916d1fa942607945a55b73e8987ef6045bd
Author: Viraj Jasani <[email protected]>
AuthorDate: Mon Jul 7 17:22:01 2025 -0700
PHOENIX-7653 New CDC Event for TTL expired rows (#2209)
---
.../org/apache/phoenix/query/QueryConstants.java | 3 +
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../apache/phoenix/query/QueryServicesOptions.java | 6 +-
.../org/apache/phoenix/util/CDCChangeBuilder.java | 3 +-
.../main/java/org/apache/phoenix/util/CDCUtil.java | 34 +-
.../phoenix/coprocessor/CDCCompactionUtil.java | 395 +++++++++++++++++++++
.../coprocessor/CDCGlobalIndexRegionScanner.java | 101 +++++-
.../phoenix/coprocessor/CompactionScanner.java | 33 +-
.../java/org/apache/phoenix/end2end/Bson3IT.java | 291 ++++++++++++++-
.../org/apache/phoenix/end2end/TableTTLIT.java | 255 +++++++++++++
.../phoenix/schema/ConditionalTTLExpressionIT.java | 193 +++++++++-
11 files changed, 1298 insertions(+), 19 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 9911f5f0a3..57da865493 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -363,6 +363,9 @@ public interface QueryConstants {
String CDC_UPSERT_EVENT_TYPE = "upsert";
String CDC_DELETE_EVENT_TYPE = "delete";
String SPLITS_FILE = "SPLITS_FILE";
+ String CDC_TTL_DELETE_EVENT_TYPE = "ttl_delete";
+ String CDC_IMAGE_CQ = "_CDC_IMG_";
+ byte[] CDC_IMAGE_CQ_BYTES = Bytes.toBytes(CDC_IMAGE_CQ);
/**
* We mark counter values 0 to 10 as reserved. Value 0 is used by
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index e16e716958..e9d0e8d86e 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -440,6 +440,9 @@ public interface QueryServices extends SQLCloseable {
String CONNECTION_QUERY_SERVICE_HISTOGRAM_SIZE_RANGES =
"phoenix.conn.query.service.histogram.size.ranges";
+ // CDC TTL mutation retry configuration
+ String CDC_TTL_MUTATION_MAX_RETRIES =
"phoenix.cdc.ttl.mutation.max.retries";
+
// This config is used to move (copy and delete) the child links from the
SYSTEM.CATALOG to SYSTEM.CHILD_LINK table.
// As opposed to a copy and async (out of band) delete.
public static final String MOVE_CHILD_LINKS_DURING_UPGRADE_ENABLED =
"phoenix.move.child_link.during.upgrade";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index bb5ef67e49..b0ab07d901 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,6 +24,7 @@ import static
org.apache.phoenix.query.QueryServices.ALLOW_VIEWS_ADD_NEW_CF_BASE
import static org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
import static
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
import static
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.CDC_TTL_MUTATION_MAX_RETRIES;
import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
import static
org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.CLUSTER_ROLE_BASED_MUTATION_BLOCK_ENABLED;
@@ -469,7 +470,7 @@ public class QueryServicesOptions {
public static final int DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE = 512;
public static final Boolean
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED =
false;
-
+ public static final int DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES = 5;
private final Configuration config;
@@ -585,7 +586,8 @@ public class QueryServicesOptions {
.setIfUnset(CQSI_THREAD_POOL_MAX_QUEUE,
DEFAULT_CQSI_THREAD_POOL_MAX_QUEUE)
.setIfUnset(CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT,
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT)
- .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED,
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED);
+ .setIfUnset(CQSI_THREAD_POOL_METRICS_ENABLED,
DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED)
+ .setIfUnset(CDC_TTL_MUTATION_MAX_RETRIES,
DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user
set
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
index 4bd2567ddf..101d0f9335 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java
@@ -31,6 +31,7 @@ import static
org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
public class CDCChangeBuilder {
@@ -69,7 +70,7 @@ public class CDCChangeBuilder {
}
public boolean isDeletionEvent() {
- return changeType == CDC_DELETE_EVENT_TYPE;
+ return changeType == CDC_DELETE_EVENT_TYPE || changeType ==
CDC_TTL_DELETE_EVENT_TYPE;
}
public boolean isNonEmptyEvent() {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 4cdd48cf97..44c09055f2 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
+import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PDataType;
import org.bson.RawBsonDocument;
@@ -112,6 +113,37 @@ public class CDCUtil {
return isCDCIndex(indexTable.getTableName().getString());
}
+ public static boolean isCDCIndexActive(PTable indexTable) {
+ return isCDCIndex(indexTable.getTableName().getString())
+ && indexTable.getIndexState() == PIndexState.ACTIVE;
+ }
+
+ /**
+ * Check if the given table has an active CDC index.
+ *
+ * @param table The PTable object.
+ * @return true if the table has an active CDC index, false otherwise.
+ */
+ public static boolean hasActiveCDCIndex(PTable table) {
+ if (table == null || table.getIndexes() == null) {
+ return false;
+ }
+ return table.getIndexes().stream().anyMatch(CDCUtil::isCDCIndexActive);
+ }
+
+ /**
+ * Return PTable of the active CDC index for the given data table.
+ *
+ * @param dataTable The data table.
+ * @return active CDC index.
+ */
+ public static PTable getActiveCDCIndex(PTable dataTable) {
+ return dataTable.getIndexes().stream()
+ .filter(CDCUtil::isCDCIndexActive)
+ .findFirst()
+ .orElse(null);
+ }
+
/**
* Check if the given table has any CDC indexes.
*
@@ -152,7 +184,7 @@ public class CDCUtil {
public static Object getColumnEncodedValue(Object value, PDataType
dataType) {
if (value != null) {
if (dataType.getSqlType() == PDataType.BSON_TYPE) {
- value = Bytes.toBytes(((RawBsonDocument)
value).getByteBuffer().asNIO());
+ value = ByteUtil.toBytes(((RawBsonDocument)
value).getByteBuffer().asNIO());
} else if (isBinaryType(dataType)) {
// Unfortunately, Base64.Encoder has no option to specify
offset and length so can't
// avoid copying bytes.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
new file mode 100644
index 0000000000..78fd936c29
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCCompactionUtil.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.CheckAndMutateResult;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.PartitionIdFunction;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.JacksonUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.query.QueryConstants.NAME_SEPARATOR;
+
+/**
+ * Utility class for CDC (Change Data Capture) operations during compaction.
+ * This class contains utilities for handling TTL row expiration events and
generating
+ * CDC events with pre-image data that are written directly to CDC index
tables.
+ */
+public final class CDCCompactionUtil {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CDCCompactionUtil.class);
+
+ private CDCCompactionUtil() {
+ // empty
+ }
+
+ /**
+ * Finds the column name for a given cell in the data table.
+ *
+ * @param dataTable The data table
+ * @param cell The cell
+ * @return The column name or null if not found
+ */
+ private static String findColumnName(PTable dataTable, Cell cell) {
+ try {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ byte[] defaultCf = dataTable.getDefaultFamilyName() != null
+ ? dataTable.getDefaultFamilyName().getBytes()
+ : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+ for (PColumn column : dataTable.getColumns()) {
+ if (column.getFamilyName() != null
+ && Bytes.equals(family,
column.getFamilyName().getBytes())
+ && Bytes.equals(qualifier,
column.getColumnQualifierBytes())) {
+ if (Bytes.equals(defaultCf,
column.getFamilyName().getBytes())) {
+ return column.getName().getString();
+ } else {
+ return column.getFamilyName().getString() +
NAME_SEPARATOR
+ + column.getName().getString();
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error finding column name for cell: {}",
CellUtil.toString(cell, true),
+ e);
+ }
+ return null;
+ }
+
+ /**
+ * Creates a CDC event map for TTL delete with pre-image data.
+ *
+ * @param expiredRowPut The expired row data
+ * @param dataTable The data table
+ * @param preImage Pre-image map
+ * @return CDC event map
+ */
+ private static Map<String, Object> createTTLDeleteCDCEvent(Put
expiredRowPut, PTable dataTable,
+ Map<String,
Object> preImage)
+ throws Exception {
+ Map<String, Object> cdcEvent = new HashMap<>();
+ cdcEvent.put(QueryConstants.CDC_EVENT_TYPE,
QueryConstants.CDC_TTL_DELETE_EVENT_TYPE);
+ for (List<Cell> familyCells :
expiredRowPut.getFamilyCellMap().values()) {
+ for (Cell cell : familyCells) {
+ String columnName = findColumnName(dataTable, cell);
+ if (columnName != null) {
+ PColumn column = dataTable.getColumnForColumnQualifier(
+ CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell));
+ Object value =
column.getDataType().toObject(cell.getValueArray(),
+ cell.getValueOffset(),
+ cell.getValueLength());
+ Object encodedValue =
+ CDCUtil.getColumnEncodedValue(value,
column.getDataType());
+ preImage.put(columnName, encodedValue);
+ }
+ }
+ }
+ cdcEvent.put(QueryConstants.CDC_PRE_IMAGE, preImage);
+ cdcEvent.put(QueryConstants.CDC_POST_IMAGE, Collections.emptyMap());
+ return cdcEvent;
+ }
+
+ /**
+ * Builds CDC index Put mutation.
+ *
+ * @param cdcIndex The CDC index table
+ * @param expiredRowPut The expired row data as a Put
+ * @param eventTimestamp The timestamp for the CDC event
+ * @param cdcEventBytes The CDC event data to store
+ * @param dataTable The data table
+ * @param env The region coprocessor environment
+ * @param region The HBase region
+ * @param compactionTimeBytes The compaction time as bytes
+ * @return The CDC index Put mutation
+ */
+ private static Put buildCDCIndexPut(PTable cdcIndex, Put expiredRowPut,
long eventTimestamp,
+ byte[] cdcEventBytes, PTable dataTable,
+ RegionCoprocessorEnvironment env,
Region region,
+ byte[] compactionTimeBytes) throws
Exception {
+
+ try (PhoenixConnection serverConnection =
QueryUtil.getConnectionOnServer(new Properties(),
+ env.getConfiguration()).unwrap(PhoenixConnection.class)) {
+
+ IndexMaintainer cdcIndexMaintainer =
+ cdcIndex.getIndexMaintainer(dataTable, serverConnection);
+
+ ValueGetter dataRowVG = new
IndexUtil.SimpleValueGetter(expiredRowPut);
+ ImmutableBytesPtr rowKeyPtr = new
ImmutableBytesPtr(expiredRowPut.getRow());
+
+ Put cdcIndexPut = cdcIndexMaintainer.buildUpdateMutation(
+ GenericKeyValueBuilder.INSTANCE,
+ dataRowVG,
+ rowKeyPtr,
+ eventTimestamp,
+ null,
+ null,
+ false,
+ region.getRegionInfo().getEncodedNameAsBytes());
+
+ byte[] rowKey = cdcIndexPut.getRow().clone();
+ System.arraycopy(compactionTimeBytes, 0, rowKey,
+ PartitionIdFunction.PARTITION_ID_LENGTH,
PDate.INSTANCE.getByteSize());
+ Put newCdcIndexPut = new Put(rowKey, eventTimestamp);
+
+ newCdcIndexPut.addColumn(
+
cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ cdcIndexMaintainer.getEmptyKeyValueQualifier(),
eventTimestamp,
+ QueryConstants.UNVERIFIED_BYTES);
+
+ // Add CDC event data
+ newCdcIndexPut.addColumn(
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.CDC_IMAGE_CQ_BYTES,
+ eventTimestamp,
+ cdcEventBytes);
+
+ return newCdcIndexPut;
+ }
+ }
+
+ /**
+ * Generates and applies a CDC index mutation for TTL expired row with
retries if required.
+ *
+ * @param cdcIndex The CDC index table
+ * @param dataTable The data table
+ * @param expiredRowPut The expired row data as a Put
+ * @param eventTimestamp The timestamp for the CDC event
+ * @param tableName The table name for logging
+ * @param env The region coprocessor environment
+ * @param region The HBase region
+ * @param compactionTimeBytes The compaction time as bytes
+ * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
+ */
+ private static void generateCDCIndexMutation(PTable cdcIndex, PTable
dataTable,
+ Put expiredRowPut,
+ long eventTimestamp, String
tableName,
+ RegionCoprocessorEnvironment
env, Region region,
+ byte[] compactionTimeBytes,
+ int cdcTtlMutationMaxRetries)
+ throws Exception {
+ Map<String, Object> cdcEvent =
+ createTTLDeleteCDCEvent(expiredRowPut, dataTable, new
HashMap<>());
+ byte[] cdcEventBytes =
+
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(cdcEvent);
+ Put cdcIndexPut =
+ buildCDCIndexPut(cdcIndex, expiredRowPut, eventTimestamp,
cdcEventBytes,
+ dataTable, env, region, compactionTimeBytes);
+
+ Exception lastException = null;
+ for (int retryCount = 0; retryCount < cdcTtlMutationMaxRetries;
retryCount++) {
+ try (Table cdcIndexTable =
env.getConnection().getTable(TableName.valueOf(
+ cdcIndex.getPhysicalName().getBytes()))) {
+ CheckAndMutate checkAndMutate =
+ CheckAndMutate.newBuilder(cdcIndexPut.getRow())
+
.ifNotExists(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.CDC_IMAGE_CQ_BYTES)
+ .build(cdcIndexPut);
+ CheckAndMutateResult result =
cdcIndexTable.checkAndMutate(checkAndMutate);
+
+ if (result.isSuccess()) {
+ // Successfully inserted new CDC event - Single CF case
+ lastException = null;
+ break;
+ } else {
+ // Row already exists, need to retrieve existing pre-image
and merge
+ // Likely to happen for multi CF case
+ Get get = new Get(cdcIndexPut.getRow());
+ get.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.CDC_IMAGE_CQ_BYTES);
+ Result existingResult = cdcIndexTable.get(get);
+
+ if (!existingResult.isEmpty()) {
+ Cell existingCell = existingResult.getColumnLatestCell(
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.CDC_IMAGE_CQ_BYTES);
+
+ if (existingCell != null) {
+ byte[] existingCdcBytes =
CellUtil.cloneValue(existingCell);
+ Map<String, Object> existingCdcEvent =
+ JacksonUtil.getObjectReader(HashMap.class)
+ .readValue(existingCdcBytes);
+ Map<String, Object> existingPreImage =
+ (Map<String, Object>)
existingCdcEvent.getOrDefault(
+ QueryConstants.CDC_PRE_IMAGE, new
HashMap<>());
+
+ // Create new TTL delete event with merged
pre-image
+ Map<String, Object> mergedCdcEvent =
+ createTTLDeleteCDCEvent(expiredRowPut,
dataTable,
+ existingPreImage);
+ byte[] mergedCdcEventBytes =
+ JacksonUtil.getObjectWriter(HashMap.class)
+ .writeValueAsBytes(mergedCdcEvent);
+
+ Put mergedCdcIndexPut = buildCDCIndexPut(cdcIndex,
expiredRowPut,
+ eventTimestamp, mergedCdcEventBytes,
dataTable, env, region,
+ compactionTimeBytes);
+
+ cdcIndexTable.put(mergedCdcIndexPut);
+ lastException = null;
+ break;
+ } else {
+ LOGGER.warn("Rare event: Skipping CDC TTL mutation
because other type"
+ + " of CDC event is recorded at time {}",
eventTimestamp);
+ break;
+ }
+ } else {
+ LOGGER.warn("Rare event.. Skipping CDC TTL mutation
because other type"
+ + " of CDC event is recorded at time {}",
eventTimestamp);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ lastException = e;
+ long backoffMs = 100;
+ LOGGER.warn("CDC mutation attempt {}/{} failed, retrying in
{}ms",
+ retryCount + 1, cdcTtlMutationMaxRetries + 1,
backoffMs, e);
+ try {
+ Thread.sleep(backoffMs);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted during CDC mutation
retry", ie);
+ }
+ }
+ }
+ if (lastException != null) {
+ LOGGER.error("Failed to generate CDC mutation after {} attempts
for table {}, index "
+ + "{}. The event update is missed.",
+ cdcTtlMutationMaxRetries + 1,
+ tableName,
+ cdcIndex.getPhysicalName().getString(),
+ lastException);
+ }
+ }
+
+ /**
+ * Generates CDC TTL delete event and writes it directly to CDC index
tables.
+ * This bypasses the normal CDC update path since the row is being expired.
+ *
+ * @param expiredRow The cells of the expired row
+ * @param tableName The table name for logging
+ * @param compactionTime The compaction timestamp
+ * @param dataTable The data table
+ * @param env The region coprocessor environment
+ * @param region The HBase region
+ * @param compactionTimeBytes The compaction time as bytes
+ * @param cdcTtlMutationMaxRetries Maximum retry attempts for CDC mutations
+ */
+ private static void generateCDCTTLDeleteEvent(List<Cell> expiredRow,
String tableName,
+ long compactionTime, PTable
dataTable,
+ RegionCoprocessorEnvironment
env, Region region,
+ byte[] compactionTimeBytes,
+ int
cdcTtlMutationMaxRetries) {
+ try {
+ PTable cdcIndex = CDCUtil.getActiveCDCIndex(dataTable);
+ if (cdcIndex == null) {
+ LOGGER.warn("No active CDC index found for table {}",
tableName);
+ return;
+ }
+ Cell firstCell = expiredRow.get(0);
+ byte[] dataRowKey = CellUtil.cloneRow(firstCell);
+ Put expiredRowPut = new Put(dataRowKey);
+
+ for (Cell cell : expiredRow) {
+ expiredRowPut.add(cell);
+ }
+
+ try {
+ generateCDCIndexMutation(cdcIndex, dataTable, expiredRowPut,
compactionTime,
+ tableName, env, region, compactionTimeBytes,
cdcTtlMutationMaxRetries);
+ } catch (Exception e) {
+ LOGGER.error("Failed to generate CDC mutation for index {}:
{}",
+ cdcIndex.getName().getString(), e.getMessage(), e);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error generating CDC TTL delete event for table {}",
+ tableName, e);
+ }
+ }
+
+ /**
+ * Handles TTL row expiration for CDC event generation.
+ * This method is called when a row is detected as expired during major
compaction.
+ *
+ * @param expiredRow The cells of the expired row
+ * @param expirationType The type of TTL expiration
+ * @param tableName The table name for logging purposes
+ * @param compactionTime The timestamp when compaction started
+ * @param table The Phoenix data table metadata
+ * @param env The region coprocessor environment for
accessing HBase
+ * resources
+ * @param region The HBase region being compacted
+ * @param compactionTimeBytes The compaction timestamp as byte array
for CDC index row key
+ * construction
+ * @param cdcTtlMutationMaxRetries Maximum number of retry attempts for
CDC mutation operations
+ */
+ static void handleTTLRowExpiration(List<Cell> expiredRow, String
expirationType,
+ String tableName, long compactionTime,
PTable table,
+ RegionCoprocessorEnvironment env,
Region region,
+ byte[] compactionTimeBytes,
+ int cdcTtlMutationMaxRetries) {
+ try {
+ Cell firstCell = expiredRow.get(0);
+ byte[] rowKey = CellUtil.cloneRow(firstCell);
+
+ LOGGER.info("TTL row expiration detected: table={}, rowKey={},
expirationType={}, "
+ + "cellCount={}, compactionTime={}",
+ tableName,
+ Bytes.toStringBinary(rowKey),
+ expirationType,
+ expiredRow.size(),
+ compactionTime);
+
+ // Generate CDC TTL delete event with pre-image data
+ generateCDCTTLDeleteEvent(expiredRow, tableName, compactionTime,
table, env, region,
+ compactionTimeBytes, cdcTtlMutationMaxRetries);
+
+ } catch (Exception e) {
+ LOGGER.error("Error handling TTL row expiration for CDC: table
{}", tableName, e);
+ }
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 4cbb4d6147..6cede61c45 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -22,9 +22,11 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -35,10 +37,9 @@ import
org.apache.phoenix.expression.SingleCellColumnExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.CDCChangeBuilder;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
@@ -50,12 +51,43 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.CDC_DATA_TABLE_DEF;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
+/**
+ * CDC (Change Data Capture) enabled region scanner for global indexes that
processes
+ * uncovered CDC index queries by reconstructing CDC events from index and
data table rows.
+ *
+ * <h3>Purpose</h3>
+ * This scanner extends {@link UncoveredGlobalIndexRegionScanner} to handle
CDC index queries
+ * where the CDC index doesn't contain all the columns needed to satisfy the
query. It bridges
+ * the gap between CDC index rows and the original data table to reconstruct
complete CDC events.
+ *
+ * <h3>CDC Event Processing</h3>
+ * The scanner processes two types of CDC events:
+ * <ul>
+ * <li><b>Regular CDC Events:</b> Requires data table scan to build CDC
event JSON from
+ * current/historical row state</li>
+ * <li><b>Pre-Image CDC Events:</b> Contains embedded CDC data (e.g., TTL
delete events)
+ * that can be returned directly without data table scan</li>
+ * </ul>
+ *
+ * <h3>CDC Event Structure</h3>
+ * The scanner produces CDC events in JSON format containing:
+ * <ul>
+ * <li><b>event_type:</b> "upsert", "delete", or "ttl_delete"</li>
+ * <li><b>pre_image:</b> Row state before the change (for
updates/deletes)</li>
+ * <li><b>post_image:</b> Row state after the change (for
inserts/updates)</li>
+ * </ul>
+ *
+ * @see UncoveredGlobalIndexRegionScanner
+ * @see CDCChangeBuilder
+ * @see CDCTableInfo
+ */
public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScanner {
private static final Logger LOGGER =
LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class);
@@ -101,6 +133,12 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
List<Cell> indexRow = indexRowIterator.next();
Cell indexCell = indexRow.get(0);
byte[] indexRowKey =
ImmutableBytesPtr.cloneCellRowIfNecessary(indexCell);
+ if (indexRow.size() > 1) {
+ boolean success = handlePreImageCDCEvent(indexRow,
indexRowKey, indexCell, result);
+ if (success) {
+ return true;
+ }
+ }
ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
indexToDataRowKeyMap.get(indexRowKey));
Result dataRow = dataRows.get(dataRowKey);
@@ -233,16 +271,29 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
private Result getCDCImage(byte[] indexRowKey, Cell firstCell) throws
JsonProcessingException {
byte[] value =
JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes(
changeBuilder.buildCDCEvent());
+ return createCDCResult(indexRowKey, firstCell,
changeBuilder.getChangeTimestamp(), value);
+ }
+
+ /**
+ * Generates the Result object for the CDC event.
+ *
+ * @param indexRowKey The CDC index row key
+ * @param firstCell The first cell
+ * @param timestamp The timestamp for the CDC event
+ * @param value The CDC event JSON bytes
+ * @return Result containing the CDC data
+ */
+ private Result createCDCResult(byte[] indexRowKey, Cell firstCell, long
timestamp,
+ byte[] value) {
CellBuilder builder =
CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
- Result cdcRow = Result.create(Arrays.asList(builder
+ return Result.create(Collections.singletonList(builder
.setRow(indexRowKey)
.setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(firstCell))
.setQualifier(cdcDataTableInfo.getCdcJsonColQualBytes())
- .setTimestamp(changeBuilder.getChangeTimestamp())
+ .setTimestamp(timestamp)
.setValue(value)
.setType(Cell.Type.Put)
.build()));
- return cdcRow;
}
private Object getColumnValue(Cell cell, PDataType dataType) {
@@ -259,4 +310,44 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
}
return CDCUtil.getColumnEncodedValue(value, dataType);
}
+
+ /**
+ * Handles CDC events that already contain pre-image data, avoiding data
table scan.
+ * Supports both the new CDC_IMAGE_CQ column and traditional CDC JSON
column.
+ *
+ * @param indexRow The CDC index row cells
+ * @param indexRowKey The CDC index row key
+ * @param indexCell The primary index cell
+ * @param result The result list to populate
+ * @return true if event was processed successfully
+ */
+ private boolean handlePreImageCDCEvent(List<Cell> indexRow, byte[]
indexRowKey,
+ Cell indexCell, List<Cell> result) {
+ Cell cdcDataCell = null;
+ for (Cell cell : indexRow) {
+ if (Bytes.equals(cell.getQualifierArray(),
cell.getQualifierOffset(),
+ cell.getQualifierLength(),
+ QueryConstants.CDC_IMAGE_CQ_BYTES, 0,
+ QueryConstants.CDC_IMAGE_CQ_BYTES.length)) {
+ cdcDataCell = cell;
+ break;
+ }
+ }
+ if (cdcDataCell == null) {
+ return false;
+ }
+ byte[] cdcEventBytes = CellUtil.cloneValue(cdcDataCell);
+ Result cdcRow = createCDCResult(indexRowKey, indexCell,
cdcDataCell.getTimestamp(),
+ cdcEventBytes);
+
+ if (tupleProjector != null) {
+ result.add(indexCell);
+ IndexUtil.addTupleAsOneCell(result, new ResultTuple(cdcRow),
tupleProjector, ptr);
+ } else {
+ result.clear();
+ }
+ LOGGER.debug("Processed CDC event with embedded data, skipped data
table scan for"
+ + " row key: {}", Bytes.toStringBinary(indexRowKey));
+ return true;
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 2b9215b740..f3577611d0 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -82,6 +83,7 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TTLExpression;
import org.apache.phoenix.schema.TTLExpressionFactory;
import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PSmallint;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -150,6 +152,7 @@ public class CompactionScanner implements InternalScanner {
private final int familyCount;
private KeepDeletedCells keepDeletedCells;
private long compactionTime;
+ private byte[] compactionTimeBytes;
private final byte[] emptyCF;
private final byte[] emptyCQ;
private final byte[] storeColumnFamily;
@@ -163,6 +166,9 @@ public class CompactionScanner implements InternalScanner {
private long outputCellCount = 0;
private boolean phoenixLevelOnly = false;
private boolean isCDCIndex;
+ private final boolean isCdcTtlEnabled;
+ private final PTable table;
+ private final int cdcTtlMutationMaxRetries;
// Only for forcing minor compaction while testing
private static boolean forceMinorCompaction = false;
@@ -184,6 +190,7 @@ public class CompactionScanner implements InternalScanner {
this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table);
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+ compactionTimeBytes = PDate.INSTANCE.toBytes(new Date(compactionTime));
columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
tableName = region.getRegionInfo().getTable().getNameAsString();
@@ -205,7 +212,15 @@ public class CompactionScanner implements InternalScanner {
emptyCFStore = familyCount == 1 ||
columnFamilyName.equals(Bytes.toString(emptyCF))
|| localIndex;
- isCDCIndex = table != null ? CDCUtil.isCDCIndex(table) : false;
+ this.table = table;
+ isCDCIndex = CDCUtil.isCDCIndex(table);
+ isCdcTtlEnabled =
+ CDCUtil.hasActiveCDCIndex(table) && major &&
!table.isMultiTenant()
+ && table.getType() == PTableType.TABLE;
+ cdcTtlMutationMaxRetries = env.getConfiguration().getInt(
+ QueryServices.CDC_TTL_MUTATION_MAX_RETRIES,
+ QueryServicesOptions.DEFAULT_CDC_TTL_MUTATION_MAX_RETRIES);
+
// Initialize the tracker that computes the TTL for the compacting
table.
// The TTL tracker can be
// simple (one single TTL for the table) when the compacting table is
not Partitioned
@@ -412,6 +427,11 @@ public class CompactionScanner implements InternalScanner {
CompiledConditionalTTLExpression ttlExpr =
(CompiledConditionalTTLExpression) rowContext.ttlExprForRow;
if (ttlExpr.isExpired(result, true)) {
+ if (isCdcTtlEnabled && !result.isEmpty()) {
+ CDCCompactionUtil.handleTTLRowExpiration(result,
"conditional_ttl", tableName,
+ compactionTime, table, env, region,
compactionTimeBytes,
+ cdcTtlMutationMaxRetries);
+ }
// If the row is expired, purge the row
result.clear();
}
@@ -2591,6 +2611,12 @@ public class CompactionScanner implements
InternalScanner {
if (major && compactionTime - rowContext.maxTimestamp >
maxLookbackInMillis + ttl) {
// Only do this check for major compaction as for minor
compactions we don't expire cells.
// The row version should not be visible via the max lookback
window. Nothing to do
+
+ if (isCdcTtlEnabled && !lastRow.isEmpty()) {
+ CDCCompactionUtil.handleTTLRowExpiration(lastRow,
"time_based_ttl", tableName,
+ compactionTime, table, env, region,
compactionTimeBytes,
+ cdcTtlMutationMaxRetries);
+ }
return;
}
retainedCells.addAll(lastRow);
@@ -2683,6 +2709,11 @@ public class CompactionScanner implements
InternalScanner {
// store is not the empty column family store.
return false;
}
+ if (isCdcTtlEnabled && !lastRowVersion.isEmpty()) {
+ CDCCompactionUtil.handleTTLRowExpiration(lastRowVersion,
"max_lookback_ttl",
+ tableName, compactionTime, table, env, region,
compactionTimeBytes,
+ cdcTtlMutationMaxRetries);
+ }
return true;
}
// If the time gap between two back to back mutations is more than
ttl then we know
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
index 1adad43dc3..8b55334c50 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
@@ -21,12 +21,17 @@ package org.apache.phoenix.end2end;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.util.CDCUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
import org.bson.BsonArray;
import org.bson.BsonBinary;
import org.bson.BsonBoolean;
@@ -39,6 +44,8 @@ import org.bson.RawBsonDocument;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
@@ -52,12 +59,19 @@ import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Base64;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -65,8 +79,25 @@ import static org.junit.Assert.assertTrue;
* Tests for BSON.
*/
@Category(ParallelStatsDisabledTest.class)
+@RunWith(Parameterized.class)
public class Bson3IT extends ParallelStatsDisabledIT {
+ private final boolean columnEncoded;
+
+ public Bson3IT(boolean columnEncoded) {
+ this.columnEncoded = columnEncoded;
+ }
+
+ @Parameterized.Parameters(name =
+ "Bson3IT_columnEncoded={0}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][]{
+ {false},
+ {true}
+ });
+ }
+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static String getJsonString(String jsonFilePath) throws IOException {
@@ -86,7 +117,8 @@ public class Bson3IT extends ParallelStatsDisabledIT {
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "CREATE TABLE " + tableName
+ " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
- + " CONSTRAINT pk PRIMARY KEY(PK1))";
+ + " CONSTRAINT pk PRIMARY KEY(PK1)) "
+ + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(ddl);
conn.createStatement().execute(cdcDdl);
@@ -627,7 +659,8 @@ public class Bson3IT extends ParallelStatsDisabledIT {
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "CREATE TABLE " + tableName
+ " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
- + " CONSTRAINT pk PRIMARY KEY(PK1))";
+ + " CONSTRAINT pk PRIMARY KEY(PK1)) "
+ + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(ddl);
conn.createStatement().execute(cdcDdl);
@@ -1073,7 +1106,8 @@ public class Bson3IT extends ParallelStatsDisabledIT {
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "CREATE TABLE " + tableName
+ " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
- + " CONSTRAINT pk PRIMARY KEY(PK1))";
+ + " CONSTRAINT pk PRIMARY KEY(PK1)) "
+ + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(ddl);
conn.createStatement().execute(cdcDdl);
@@ -1443,7 +1477,8 @@ public class Bson3IT extends ParallelStatsDisabledIT {
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "CREATE TABLE " + tableName
+ " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
- + " CONSTRAINT pk PRIMARY KEY(PK1))";
+ + " CONSTRAINT pk PRIMARY KEY(PK1)) "
+ + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(ddl);
conn.createStatement().execute(cdcDdl);
@@ -1865,12 +1900,14 @@ public class Bson3IT extends ParallelStatsDisabledIT {
@Test
public void testCDCWithCaseSenstitiveTableAndPks() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- String tableName = "XYZ.\"test.table\"";
- String cdcName = "XYZ.\"CDC_test.table\"";
- String cdcNameWithoutSchema = "\"CDC_test.table\"";
+ String nameQuotes = "test.tableTESt-_123" + generateUniqueName();
+ String tableName = "XYZ.\"" + nameQuotes + "\"";
+ String cdcName = "XYZ.\"CDC_" + nameQuotes + "\"";
+ String cdcNameWithoutSchema = "\"CDC_" + nameQuotes + "\"";
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String ddl = "CREATE TABLE " + tableName +
- " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY
KEY(\"hk\"))";
+ " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY
KEY(\"hk\")) "
+ + (this.columnEncoded ? "" : "COLUMN_ENCODED_BYTES=0");
conn.createStatement().execute(ddl);
String cdcDdl = "CREATE CDC " + cdcNameWithoutSchema + " ON " +
tableName;
@@ -1930,6 +1967,244 @@ public class Bson3IT extends ParallelStatsDisabledIT {
actualDoc);
assertFalse("Should only have one CDC record", rs.next());
+
+ conn.createStatement().execute("DROP TABLE " + tableName + " CASCADE");
+ }
+ }
+
+ /**
+ * Test BSON operations with SQL conditions and TTL functionality.
+ */
+ @Test
+ public void testBsonOpsWithSqlConditionsUpdateSuccessWithTTL() throws
Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = generateUniqueName();
+ String cdcName = generateUniqueName();
+ final int ttlSeconds = 10;
+ final int maxLookbackAge = 5;
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "CREATE TABLE " + tableName
+ + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
+ + " CONSTRAINT pk PRIMARY KEY(PK1)) TTL="
+ + ttlSeconds + ", \"phoenix.max.lookback.age.seconds\" = " +
maxLookbackAge
+ + (this.columnEncoded ? "" : ", COLUMN_ENCODED_BYTES=0");
+ String cdcDdl = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(ddl);
+ conn.createStatement().execute(cdcDdl);
+
+ ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+ long startTime = System.currentTimeMillis() + 1000;
+ startTime = (startTime / 1000) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTime);
+
+ String sample1 = getJsonString("json/sample_01.json");
+ String sample2 = getJsonString("json/sample_02.json");
+ String sample3 = getJsonString("json/sample_03.json");
+ BsonDocument bsonDocument1 = RawBsonDocument.parse(sample1);
+ BsonDocument bsonDocument2 = RawBsonDocument.parse(sample2);
+ BsonDocument bsonDocument3 = RawBsonDocument.parse(sample3);
+
+ PreparedStatement stmt =
+ conn.prepareStatement("UPSERT INTO " + tableName + " VALUES
(?,?,?)");
+ stmt.setString(1, "pk0001");
+ stmt.setString(2, "0002");
+ stmt.setObject(3, bsonDocument1);
+ stmt.executeUpdate();
+
+ stmt.setString(1, "pk1010");
+ stmt.setString(2, "1010");
+ stmt.setObject(3, bsonDocument2);
+ stmt.executeUpdate();
+
+ stmt.setString(1, "pk1011");
+ stmt.setString(2, "1011");
+ stmt.setObject(3, bsonDocument3);
+ stmt.executeUpdate();
+
+ conn.commit();
+ injectEdge.incrementValue(1000);
+
+ String conditionExpression =
+ "press = $press AND track[0].shot[2][0].city.standard[5] =
$softly";
+
+ BsonDocument conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append("$press", new BsonString("beat"))
+ .append("$softly", new BsonString("softly")));
+
+ BsonDocument updateExp = new BsonDocument()
+ .append("$SET", new BsonDocument()
+ .append("browserling",
+ new
BsonBinary(PDouble.INSTANCE.toBytes(-505169340.54880095)))
+ .append("track[0].shot[2][0].city.standard[5]", new
BsonString("soft"))
+ .append("track[0].shot[2][0].city.problem[2]",
+ new
BsonString("track[0].shot[2][0].city.problem[2] + 529.435")))
+ .append("$UNSET", new BsonDocument()
+ .append("track[0].shot[2][0].city.flame", new
BsonNull()));
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END,"
+ + " C1 = ?");
+ stmt.setString(1, "pk0001");
+ stmt.setString(2, "0003");
+ stmt.executeUpdate();
+
+ String query = "SELECT * FROM " + tableName + " WHERE PK1 = 'pk0001'";
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ BsonDocument document1 = (BsonDocument) rs.getObject(3);
+
+ updateExp = new BsonDocument()
+ .append("$ADD", new BsonDocument()
+ .append("new_samples",
+ new BsonDocument().append("$set",
+ new BsonArray(Arrays.asList(
+ new
BsonBinary(Bytes.toBytes("Sample10")),
+ new
BsonBinary(Bytes.toBytes("Sample12")),
+ new
BsonBinary(Bytes.toBytes("Sample13")),
+ new
BsonBinary(Bytes.toBytes("Sample14"))
+ )))))
+ .append("$DELETE_FROM_SET", new BsonDocument()
+ .append("new_samples",
+ new BsonDocument().append("$set",
+ new BsonArray(Arrays.asList(
+ new
BsonBinary(Bytes.toBytes("Sample02")),
+ new
BsonBinary(Bytes.toBytes("Sample03"))
+ )))))
+ .append("$SET", new BsonDocument()
+ .append("newrecord", ((BsonArray)
(document1.get("track"))).get(0)))
+ .append("$UNSET", new BsonDocument()
+ .append("rather[3].outline.halfway.so[2][2]", new
BsonNull()));
+
+ conditionExpression =
+ "field_not_exists(newrecord) AND
field_exists(rather[3].outline.halfway.so[2][2])";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument());
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END");
+
+ stmt.setString(1, "pk1010");
+ stmt.executeUpdate();
+
+ updateExp = new BsonDocument()
+ .append("$SET", new BsonDocument()
+ .append("result[1].location.state", new
BsonString("AK")))
+ .append("$UNSET", new BsonDocument()
+ .append("result[4].emails[1]", new BsonNull()));
+
+ conditionExpression =
+ "result[2].location.coordinates.latitude > $latitude OR "
+ + "(field_exists(result[1].location) AND
result[1].location.state != $state" +
+ " AND field_exists(result[4].emails[1]))";
+
+ conditionDoc = new BsonDocument();
+ conditionDoc.put("$EXPR", new BsonString(conditionExpression));
+ conditionDoc.put("$VAL", new BsonDocument()
+ .append("$latitude", new BsonDouble(0))
+ .append("$state", new BsonString("AK")));
+
+ stmt = conn.prepareStatement("UPSERT INTO " + tableName
+ + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN"
+ + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() +
"')"
+ + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE
COL END");
+
+ stmt.setString(1, "pk1011");
+ stmt.executeUpdate();
+
+ conn.commit();
+ injectEdge.incrementValue(1000);
+
+ // Capture timestamp before TTL expiration
+ Timestamp beforeTTLTimestamp = new Timestamp(injectEdge.currentTime());
+
+ // Capture last post-images for each row before TTL expiration
+ Map<String, Map<String, Object>> lastPostImages = new HashMap<>();
+
+ String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcName +
+ " WHERE PHOENIX_ROW_TIMESTAMP() <= ? ORDER BY
PHOENIX_ROW_TIMESTAMP() DESC";
+ try (PreparedStatement pst = conn.prepareStatement(cdcQuery)) {
+ pst.setTimestamp(1, beforeTTLTimestamp);
+ try (ResultSet cdcRs = pst.executeQuery()) {
+ while (cdcRs.next()) {
+ String pk = cdcRs.getString(2);
+ if (!lastPostImages.containsKey(pk)) {
+ String cdcVal = cdcRs.getString(3);
+ Map<String, Object> cdcEvent = OBJECT_MAPPER.readValue(cdcVal,
HashMap.class);
+ if (cdcEvent.containsKey(CDC_POST_IMAGE)) {
+ lastPostImages.put(pk, (Map<String, Object>)
cdcEvent.get(CDC_POST_IMAGE));
+ }
+ }
+ }
+ }
+ }
+
+ // Verify all rows have post-images captured
+ assertEquals("Should have post-images for all 3 rows", 3,
lastPostImages.size());
+ assertNotNull("Should have post-image for pk0001",
lastPostImages.get("pk0001"));
+ assertNotNull("Should have post-image for pk1010",
lastPostImages.get("pk1010"));
+ assertNotNull("Should have post-image for pk1011",
lastPostImages.get("pk1011"));
+
+ // Advance time past TTL to expire rows
+ injectEdge.incrementValue((ttlSeconds + maxLookbackAge + 1) * 1000);
+
+ // Flush and major compact to trigger TTL expiration
+ Admin admin = getUtility().getAdmin();
+ admin.flush(TableName.valueOf(tableName));
+
+ TestUtil.majorCompact(getUtility(), TableName.valueOf(tableName));
+
+ // Verify all rows are expired from data table
+ String dataQuery = "SELECT * FROM " + tableName;
+ try (ResultSet dataRs = conn.createStatement().executeQuery(dataQuery)) {
+ assertFalse("All rows should be expired from data table",
dataRs.next());
+ }
+
+ // Verify TTL_DELETE CDC events were generated for all rows
+ String ttlDeleteQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcName +
+ " WHERE PHOENIX_ROW_TIMESTAMP() > ?";
+ Map<String, Map<String, Object>> ttlDeleteEvents = new HashMap<>();
+
+ try (PreparedStatement pst = conn.prepareStatement(ttlDeleteQuery)) {
+ pst.setTimestamp(1, beforeTTLTimestamp);
+ try (ResultSet ttlRs = pst.executeQuery()) {
+ while (ttlRs.next()) {
+ String pk = ttlRs.getString(2);
+ String cdcVal = ttlRs.getString(3);
+ Map<String, Object> cdcEvent = OBJECT_MAPPER.readValue(cdcVal,
HashMap.class);
+
+ // Only process TTL delete events
+ if
(CDC_TTL_DELETE_EVENT_TYPE.equals(cdcEvent.get(CDC_EVENT_TYPE))) {
+ ttlDeleteEvents.put(pk, (Map<String, Object>)
cdcEvent.get(CDC_PRE_IMAGE));
+ }
+ }
+ }
+ }
+
+ // Verify TTL delete events for all rows
+ assertEquals("Should have TTL delete events for all 3 rows", 3,
ttlDeleteEvents.size());
+
+ // Verify pre-image consistency for each row
+ for (String pk : Arrays.asList("pk0001", "pk1010", "pk1011")) {
+ Map<String, Object> ttlPreImage = ttlDeleteEvents.get(pk);
+ assertNotNull("Should have TTL delete event for " + pk, ttlPreImage);
+ Map<String, Object> lastPostImage = lastPostImages.get(pk);
+ assertNotNull("TTL pre-image should not be null for " + pk,
ttlPreImage);
+ assertNotNull("Last post-image should not be null for " + pk,
lastPostImage);
+ assertEquals("TTL delete pre-image should match last post-image for "
+ pk,
+ lastPostImage, ttlPreImage);
+ }
+ } finally {
+ EnvironmentEdgeManager.reset();
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 1d68ce776c..7fdabc648b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -28,6 +28,9 @@ import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.*;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTable;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -46,10 +49,16 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static
org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
import static org.junit.Assert.*;
@Category(NeedsOwnMiniClusterTest.class)
@@ -314,6 +323,102 @@ public class TableTTLIT extends BaseTest {
}
}
+ @Test
+ public void testRowSpansMultipleTTLWindowsWithCdc() throws Exception {
+ final int maxLookbackAge = tableLevelMaxLookback != null
+ ? tableLevelMaxLookback : MAX_LOOKBACK_AGE;
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String schemaName = generateUniqueName();
+ String tableName = schemaName + "." + generateUniqueName();
+ String noCompactTableName = generateUniqueName();
+ createTable(tableName);
+ createTable(noCompactTableName);
+ conn.createStatement().execute("ALTER TABLE " + tableName
+ + " SET \"phoenix.max.lookback.age.seconds\" = " +
maxLookbackAge);
+
+ // Create CDC index for TTL verification
+ String cdcName = generateUniqueName();
+ String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName +
+ " INCLUDE (PRE, POST)";
+ conn.createStatement().execute(cdcSql);
+ conn.commit();
+
+ String cdcFullName = SchemaUtil.getTableName(null, schemaName +
"." + cdcName);
+
+ ObjectMapper mapper = new ObjectMapper();
+ long startTime = System.currentTimeMillis() + 1000;
+ startTime = (startTime / 1000) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTime);
+
+ // Track the last post-image from normal CDC events
+ Map<String, Object> lastPostImage = null;
+
+ for (int columnIndex = 1; columnIndex <= MAX_COLUMN_INDEX;
columnIndex++) {
+ String value = Integer.toString(RAND.nextInt(1000));
+ updateColumn(conn, tableName, "a", columnIndex, value);
+ updateColumn(conn, noCompactTableName, "a", columnIndex,
value);
+ conn.commit();
+
+ // Capture the last post-image from CDC events
+ String cdcQuery =
+ "SELECT PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" FROM " +
cdcFullName +
+ " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC LIMIT
1";
+ try (ResultSet rs =
conn.createStatement().executeQuery(cdcQuery)) {
+ if (rs.next()) {
+ Map<String, Object> cdcEvent =
+ mapper.readValue(rs.getString(2),
HashMap.class);
+ if (cdcEvent.containsKey(CDC_POST_IMAGE)) {
+ lastPostImage = (Map<String, Object>)
cdcEvent.get(CDC_POST_IMAGE);
+ }
+ }
+ }
+
+ injectEdge.incrementValue(ttl * 1000 - 1000);
+ }
+ assertNotNull("Last post-image should not be null", lastPostImage);
+
+ // Advance time past TTL to expire the row
+ injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000);
+
+ flush(TableName.valueOf(tableName));
+ majorCompact(TableName.valueOf(tableName));
+
+ // Verify row is expired from data table
+ String dataQuery = "SELECT * FROM " + tableName + " WHERE id =
'a'";
+ try (ResultSet rs =
conn.createStatement().executeQuery(dataQuery)) {
+ assertFalse("Row should be expired from data table",
rs.next());
+ }
+
+ // Verify TTL_DELETE CDC event was generated and compare pre-image
+ String cdcQuery = "SELECT \"CDC JSON\" FROM " + cdcFullName +
+ " ORDER BY PHOENIX_ROW_TIMESTAMP() DESC LIMIT 1";
+ try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery))
{
+ assertTrue("Should find TTL delete event", rs.next());
+ Map<String, Object> ttlDeleteEvent =
+ mapper.readValue(rs.getString(1), HashMap.class);
+ LOG.info("TTL delete event: {}", ttlDeleteEvent);
+
+ assertEquals("Should be ttl_delete event",
CDC_TTL_DELETE_EVENT_TYPE,
+ ttlDeleteEvent.get(CDC_EVENT_TYPE));
+
+ Map<String, Object> ttlPreImage =
+ (Map<String, Object>)
ttlDeleteEvent.get(CDC_PRE_IMAGE);
+ assertNotNull("TTL pre-image should not be null", ttlPreImage);
+
+ assertEquals(
+ "TTL delete pre-image should match last post-image
from normal CDC events",
+ lastPostImage, ttlPreImage);
+
+ assertFalse("No more event should be found", rs.next());
+ }
+
+ compareRow(conn, tableName, noCompactTableName, "a",
MAX_COLUMN_INDEX);
+ injectEdge.incrementValue(1000);
+ }
+ }
+
@Test
public void testMultipleRowsWithUpdatesMoreThanTTLApart() throws Exception
{
// for the purpose of this test only considering cases when
maxlookback is 0
@@ -544,6 +649,156 @@ public class TableTTLIT extends BaseTest {
}
}
+ /**
+ * Test CDC events for TTL expired rows. This test creates a table with
TTL and CDC index,
+ * verifies insert/update CDC events with pre/post images, then triggers
major compaction
+ * to expire rows and verifies TTL_DELETE events with pre-image data.
+ */
+ @Test
+ public void testCDCTTLExpiredRows() throws Exception {
+ final int maxLookbackAge = tableLevelMaxLookback != null
+ ? tableLevelMaxLookback : MAX_LOOKBACK_AGE;
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String schemaName = generateUniqueName();
+ String tableName = schemaName + "." + generateUniqueName();
+ String cdcName = generateUniqueName();
+ ObjectMapper mapper = new ObjectMapper();
+
+ createTable(tableName);
+ conn.createStatement().execute("ALTER TABLE " + tableName
+ + " SET \"phoenix.max.lookback.age.seconds\" = " +
maxLookbackAge);
+
+ String cdcSql = "CREATE CDC " + cdcName + " ON " + tableName +
+ " INCLUDE (PRE, POST)";
+ conn.createStatement().execute(cdcSql);
+ conn.commit();
+
+ String cdcIndexName =
+ schemaName + "." + CDCUtil.getCDCIndexName(schemaName +
"." + cdcName);
+ String cdcFullName = SchemaUtil.getTableName(null, schemaName +
"." + cdcName);
+
+ PTable cdcIndex = ((PhoenixConnection)
conn).getTableNoCache(cdcIndexName);
+ assertNotNull("CDC index should be created", cdcIndex);
+ assertTrue("CDC index should be CDC type",
CDCUtil.isCDCIndex(cdcIndex));
+
+ // Setup time injection
+ long startTime = System.currentTimeMillis() + 1000;
+ startTime = (startTime / 1000) * 1000;
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTime);
+
+ // Insert initial row
+ updateRow(conn, tableName, "row1");
+ long insertTime = injectEdge.currentTime();
+ injectEdge.incrementValue(1000);
+
+ // Update the row
+ updateColumn(conn, tableName, "row1", 1, "updated_val1");
+ updateColumn(conn, tableName, "row1", 2, "updated_val2");
+ conn.commit();
+ long updateTime = injectEdge.currentTime();
+ injectEdge.incrementValue(1000);
+
+ // Verify CDC events for insert and update
+ String cdcQuery = "SELECT PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\"
FROM " + cdcFullName;
+ Map<String, Object> postImage;
+ try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery))
{
+ // First event - insert
+ assertTrue("Should have insert CDC event", rs.next());
+ long eventTimestamp = rs.getTimestamp(1).getTime();
+ assertTrue("Insert event timestamp should be close to insert
time",
+ Math.abs(eventTimestamp - insertTime) < 2000);
+
+ Map<String, Object> cdcEvent =
mapper.readValue(rs.getString(2), HashMap.class);
+ assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE,
+ cdcEvent.get(CDC_EVENT_TYPE));
+ assertTrue("Should have post-image",
cdcEvent.containsKey(CDC_POST_IMAGE));
+
+ postImage = (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE);
+ assertFalse("post image must contain something",
postImage.isEmpty());
+
+ // Second event - update
+ assertTrue("Should have update CDC event", rs.next());
+ eventTimestamp = rs.getTimestamp(1).getTime();
+ assertTrue("Update event timestamp should be close to update
time",
+ Math.abs(eventTimestamp - updateTime) < 2000);
+
+ cdcEvent = mapper.readValue(rs.getString(2), HashMap.class);
+ assertEquals("Should be upsert event", CDC_UPSERT_EVENT_TYPE,
+ cdcEvent.get(CDC_EVENT_TYPE));
+ assertTrue("Should have pre-image",
cdcEvent.containsKey(CDC_PRE_IMAGE));
+ assertTrue("Should have post-image",
cdcEvent.containsKey(CDC_POST_IMAGE));
+
+ Map<String, Object> preImage = (Map<String, Object>)
cdcEvent.get(CDC_PRE_IMAGE);
+ assertEquals("Comparison of last post-image with new
pre-image", postImage,
+ preImage);
+ postImage = (Map<String, Object>) cdcEvent.get(CDC_POST_IMAGE);
+ LOG.info("Post-image {}", postImage);
+ }
+
+ // Advance time past TTL to expire the row
+ injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000);
+
+ TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+ TestUtil.dumpTable(conn, TableName.valueOf(cdcIndexName));
+ flush(TableName.valueOf(tableName));
+ majorCompact(TableName.valueOf(tableName));
+ TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+ TestUtil.dumpTable(conn, TableName.valueOf(cdcIndexName));
+
+ // Verify row is expired from data table
+ String dataQuery = "SELECT * FROM " + tableName + " WHERE id =
'row1'";
+ try (ResultSet rs =
conn.createStatement().executeQuery(dataQuery)) {
+ assertFalse("Row should be expired from data table",
rs.next());
+ }
+
+ // Verify TTL_DELETE CDC event was generated
+ try (ResultSet rs = conn.createStatement().executeQuery(cdcQuery))
{
+ int eventCount = 0;
+ Map<String, Object> ttlDeleteEvent = null;
+
+ while (rs.next()) {
+ eventCount++;
+ Map<String, Object> cdcEvent =
mapper.readValue(rs.getString(2), HashMap.class);
+ String eventType = (String) cdcEvent.get(CDC_EVENT_TYPE);
+ assertEquals("Event type must be " +
CDC_TTL_DELETE_EVENT_TYPE + " but found " +
+ eventType,
+ CDC_TTL_DELETE_EVENT_TYPE, eventType);
+ if (CDC_TTL_DELETE_EVENT_TYPE.equals(eventType)) {
+ ttlDeleteEvent = cdcEvent;
+ }
+ }
+
+ assertEquals("Should have only 1 event for TTL_DELETE because
other events are " +
+ "expired due to major compaction", 1, eventCount);
+ assertNotNull("Should have TTL delete event", ttlDeleteEvent);
+
+ // Verify TTL delete event structure
+ assertEquals("Should be ttl_delete event",
CDC_TTL_DELETE_EVENT_TYPE,
+ ttlDeleteEvent.get(CDC_EVENT_TYPE));
+ assertTrue("TTL delete should have pre-image",
+ ttlDeleteEvent.containsKey(CDC_PRE_IMAGE));
+
+ Map<String, Object> preImage =
+ (Map<String, Object>)
ttlDeleteEvent.get(CDC_PRE_IMAGE);
+ assertEquals("Comparison of last post-image with new
pre-image", postImage,
+ preImage);
+ LOG.info("TTL delete event verified: {}", ttlDeleteEvent);
+ }
+
+ String cdcScanQuery = "SELECT \"CDC JSON\" FROM " + cdcFullName +
+ " WHERE \"CDC JSON\" LIKE '%ttl_delete%'";
+ try (ResultSet rs =
conn.createStatement().executeQuery(cdcScanQuery)) {
+ assertTrue("Should find TTL delete event via scan", rs.next());
+ Map<String, Object> cdcEvent =
mapper.readValue(rs.getString(1), HashMap.class);
+ assertEquals("Should be ttl_delete event",
CDC_TTL_DELETE_EVENT_TYPE,
+ cdcEvent.get(CDC_EVENT_TYPE));
+ }
+
+ LOG.info("CDC TTL test completed successfully for table: {}",
tableName);
+ }
+ }
private void flush(TableName table) throws IOException {
Admin admin = getUtility().getAdmin();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
index 6fdc674a04..8e582373f4 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/schema/ConditionalTTLExpressionIT.java
@@ -31,7 +31,6 @@ import static
org.apache.phoenix.schema.LiteralTTLExpression.TTL_EXPRESSION_FORE
import static org.apache.phoenix.util.TestUtil.retainSingleQuotes;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -43,6 +42,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
@@ -55,6 +55,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.CounterGroup;
@@ -66,6 +67,7 @@ import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.PhoenixTestBuilder;
import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder;
import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.OtherOptions;
import org.apache.phoenix.query.PhoenixTestBuilder.SchemaBuilder.TableOptions;
@@ -130,6 +132,8 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
private Map<Integer, String> dataRowPosToKey = Maps.newHashMap();
private Map<Integer, String> indexRowPosToKey = Maps.newHashMap();
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
public ConditionalTTLExpressionIT(boolean columnEncoded,
Integer tableLevelMaxLooback) {
this.columnEncoded = columnEncoded;
@@ -421,6 +425,193 @@ public class ConditionalTTLExpressionIT extends
ParallelStatsDisabledIT {
}
}
+ /**
+ * Tests CDC (Change Data Capture) functionality with TTL (Time To Live)
expired rows.
+ * This test validates the complete CDC lifecycle including:
+ */
+ @Test
+ public void testPhoenixRowTimestampWithCdc() throws Exception {
+ int ttl = 50 * 1000;
+ String ttlExpression = String.format(
+ "TO_NUMBER(CURRENT_TIME()) -
TO_NUMBER(PHOENIX_ROW_TIMESTAMP()) >= %d", ttl);
+ createTable(ttlExpression);
+ String tableName = schemaBuilder.getEntityTableName();
+ String cdcName = "cdc_" + generateUniqueName();
+ injectEdge();
+ int rowCount = 5;
+ long actual;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ // Initial Setup - Create CDC index on the table
+ conn.createStatement().execute("CREATE CDC " + cdcName + " ON " +
tableName);
+ populateTable(conn, rowCount);
+
+ // Verify initial row count
+ actual = TestUtil.getRowCount(conn, tableName, true);
+ assertEquals("Table should contain all inserted rows", 5, actual);
+
+ // Query initial CDC events (inserts)
+ String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
+ PhoenixTestBuilder.DDLDefaults.DEFAULT_SCHEMA_NAME + "." +
cdcName;
+
+ ResultSet resultSet =
conn.createStatement().executeQuery(cdcQuery);
+ List<Map<String, Object>> postImageList = new ArrayList<>();
+ while (resultSet.next()) {
+ String cdcVal = resultSet.getString(4);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal,
Map.class);
+
+ // Validate insert events have no pre-image but have post-image
+ Map<String, Object> preImage =
+ (Map<String, Object>)
map.get(QueryConstants.CDC_PRE_IMAGE);
+ assertTrue("Insert events should have empty pre-image",
preImage.isEmpty());
+
+ Map<String, Object> postImage =
+ (Map<String, Object>)
map.get(QueryConstants.CDC_POST_IMAGE);
+ assertFalse("Insert events should have non-empty post-image",
postImage.isEmpty());
+ postImageList.add(postImage);
+
+ assertEquals("Initial events should be UPSERT type",
+ QueryConstants.CDC_UPSERT_EVENT_TYPE,
+ map.get(QueryConstants.CDC_EVENT_TYPE));
+ }
+ assertEquals("Post image list size should be 5 but it is " +
postImageList.size(), 5,
+ postImageList.size());
+
+ // TTL Expiration - Advance time to trigger TTL expiration
+ injectEdge.incrementValue(ttl);
+ doMajorCompaction(tableName);
+
+ // Verify all rows are expired from data table
+ actual = TestUtil.getRowCount(conn, tableName, true);
+ assertEquals("All rows should be expired after TTL", 0, actual);
+
+ // TTL CDC Events - Validate TTL_DELETE events are generated
+ resultSet = conn.createStatement().executeQuery(cdcQuery);
+ int i = 0;
+ while (resultSet.next()) {
+ String cdcVal = resultSet.getString(4);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal,
Map.class);
+
+ // Validate TTL delete events
+ assertEquals("TTL expired rows should generate TTL_DELETE
events",
+ QueryConstants.CDC_TTL_DELETE_EVENT_TYPE,
+ map.get(QueryConstants.CDC_EVENT_TYPE));
+
+ Map<String, Object> preImage =
+ (Map<String, Object>)
map.get(QueryConstants.CDC_PRE_IMAGE);
+ assertFalse("TTL_DELETE events should have non-empty
pre-image",
+ preImage.isEmpty());
+
+ Map<String, Object> postImage =
+ (Map<String, Object>)
map.get(QueryConstants.CDC_POST_IMAGE);
+ assertTrue("TTL_DELETE events should have empty post-image",
postImage.isEmpty());
+
+ // TTL delete pre-image should match previous upsert post-image
+ assertEquals("TTL_DELETE pre-image should match original
insert post-image",
+ postImageList.get(i), preImage);
+ i++;
+ }
+ assertEquals("Num of TTL_DELETE events verified should be 5 but it
is " + i, 5, i);
+
+ // Update an expired row to bring it back
+ injectEdge.incrementValue(1);
+ long currentTime = injectEdge.currentTime();
+ updateColumn(conn, 1, "VAL4", currentTime);
+
+ // Verify the row
+ actual = TestUtil.getRowCount(conn, tableName, true);
+ assertEquals("Only one row should be resurrected after update", 1,
actual);
+
+ // Verify resurrected row has only updated column visible
+ try (ResultSet rs = readRow(conn, 1)) {
+ assertTrue("Resurrected row should exist", rs.next());
+ for (String col : COLUMNS) {
+ if (!col.equals("VAL4")) {
+ assertNull("Non-updated columns should be null in
resurrected row",
+ rs.getObject(col));
+ } else {
+ assertEquals("Updated column should have new
timestamp",
+ currentTime,
rs.getTimestamp("VAL4").getTime());
+ }
+ }
+ }
+
+ // Advance time beyond max lookback window
+ injectEdge.incrementValue(tableLevelMaxLookback * 1000L + 2);
+ doMajorCompaction(tableName);
+ CellCount expectedCellCount = new CellCount();
+ expectedCellCount.insertRow(dataRowPosToKey.get(1), 2);
+ validateTable(conn, tableName, expectedCellCount,
dataRowPosToKey.values());
+
+ // Query CDC events
+ cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
+ PhoenixTestBuilder.DDLDefaults.DEFAULT_SCHEMA_NAME + "." +
cdcName
+ + " WHERE PHOENIX_ROW_TIMESTAMP() >= ?";
+ PreparedStatement ps = conn.prepareStatement(cdcQuery);
+ ps.setTimestamp(1, new Timestamp(currentTime));
+ resultSet = ps.executeQuery();
+ postImageList = new ArrayList<>();
+ while (resultSet.next()) {
+ String cdcVal = resultSet.getString(4);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal,
Map.class);
+
+ assertEquals("Resurrection event should be UPSERT type",
+ QueryConstants.CDC_UPSERT_EVENT_TYPE,
+ map.get(QueryConstants.CDC_EVENT_TYPE));
+
+ Map<String, Object> preImage =
+ (Map<String, Object>)
map.get(QueryConstants.CDC_PRE_IMAGE);
+ assertTrue("Resurrection event should have empty pre-image",
preImage.isEmpty());
+
+ Map<String, Object> postImage =
+ (Map<String, Object>)
map.get(QueryConstants.CDC_POST_IMAGE);
+ assertFalse("Resurrection event should have non-empty
post-image",
+ postImage.isEmpty());
+ postImageList.add(postImage);
+ }
+ assertEquals("Post image list size should be 5 but it is " +
postImageList.size(), 1,
+ postImageList.size());
+
+ // Trigger TTL expiration again
+ injectEdge.incrementValue(ttl);
+ doMajorCompaction(tableName);
+
+ // Verify all rows are expired from data table
+ actual = TestUtil.getRowCount(conn, tableName, true);
+ assertEquals("All rows should be expired after TTL", 0, actual);
+
+ expectedCellCount = new CellCount();
+ validateTable(conn, tableName, expectedCellCount,
dataRowPosToKey.values());
+
+ // Validate second round of TTL_DELETE events
+ ps = conn.prepareStatement(cdcQuery);
+ ps.setTimestamp(1, new Timestamp(currentTime));
+ resultSet = ps.executeQuery();
+ i = 0;
+ while (resultSet.next()) {
+ String cdcVal = resultSet.getString(4);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal,
Map.class);
+
+ assertEquals("Second TTL expiration should generate TTL_DELETE
events",
+ QueryConstants.CDC_TTL_DELETE_EVENT_TYPE,
+ map.get(QueryConstants.CDC_EVENT_TYPE));
+
+ Map<String, Object> preImage =
+ (Map<String, Object>)
map.get(QueryConstants.CDC_PRE_IMAGE);
+ assertFalse("Second TTL_DELETE should have non-empty
pre-image",
+ preImage.isEmpty());
+
+ Map<String, Object> postImage =
+ (Map<String, Object>)
map.get(QueryConstants.CDC_POST_IMAGE);
+ assertTrue("Second TTL_DELETE should have empty post-image",
postImage.isEmpty());
+
+ assertEquals("Second TTL_DELETE pre-image should match
resurrection post-image",
+ postImageList.get(i), preImage);
+ i++;
+ }
+ assertEquals("Num of TTL_DELETE events verified should be 5 but it
is " + i, 1, i);
+ }
+ }
+
@Test
public void testDeleteMarkers() throws Exception {
String ttlCol = "VAL5";