This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 90d3e16c68 PHOENIX-7644 : CDC Stream improvements (#2196)
90d3e16c68 is described below
commit 90d3e16c687617b05f0a789afb8fd278ca51416e
Author: Palash Chauhan <[email protected]>
AuthorDate: Fri Jun 20 10:26:24 2025 -0700
PHOENIX-7644 : CDC Stream improvements (#2196)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../org/apache/phoenix/query/QueryConstants.java | 6 +-
.../main/java/org/apache/phoenix/util/CDCUtil.java | 11 +-
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 5 +
.../phoenix/coprocessor/PhoenixMasterObserver.java | 155 +++++++++++----------
.../apache/phoenix/end2end/CDCDefinitionIT.java | 67 ++++++++-
5 files changed, 164 insertions(+), 80 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 02c27096a0..9911f5f0a3 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -663,7 +663,8 @@ public interface QueryConstants {
TABLE_NAME + "," + STREAM_NAME + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
- TRANSACTIONAL + "=" + Boolean.FALSE;
+ TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
+ UPDATE_CACHE_FREQUENCY + "=" + "7200000";
String CREATE_CDC_STREAM_METADATA = "CREATE TABLE " +
SYSTEM_CATALOG_SCHEMA + ".\"" +
SYSTEM_CDC_STREAM_TABLE + "\"(\n" +
@@ -682,5 +683,6 @@ public interface QueryConstants {
TABLE_NAME + "," + STREAM_NAME + "," + PARTITION_ID + "," +
PARENT_PARTITION_ID + "))\n" +
HConstants.VERSIONS + "=%s,\n" +
ColumnFamilyDescriptorBuilder.KEEP_DELETED_CELLS + "=%s,\n" +
- TRANSACTIONAL + "=" + Boolean.FALSE;
+ TRANSACTIONAL + "=" + Boolean.FALSE + ",\n" +
+ UPDATE_CACHE_FREQUENCY + "=" + "7200000";
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 3117c5cebc..b728273cf0 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -22,6 +22,7 @@ import java.sql.SQLException;
import java.sql.Types;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.HashSet;
@@ -39,7 +40,6 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PVarchar;
import org.bson.RawBsonDocument;
public class CDCUtil {
@@ -99,6 +99,15 @@ public class CDCUtil {
return indexName.startsWith(CDC_INDEX_PREFIX);
}
+ public static boolean isCDCIndex(byte[] indexNameBytes) {
+ String indexName = Bytes.toString(indexNameBytes);
+ return isCDCIndex(indexName);
+ }
+
+ public static byte[] getCdcObjectName(byte[] cdcIndexName) {
+ return Arrays.copyOfRange(cdcIndexName, CDC_INDEX_PREFIX.length(),
cdcIndexName.length);
+ }
+
public static boolean isCDCIndex(PTable indexTable) {
return isCDCIndex(indexTable.getTableName().getString());
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 9165bd278c..6253b66eb1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3333,6 +3333,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
// Recursively delete indexes
for (byte[] indexName : indexNames) {
+ if (CDCUtil.isCDCIndex(indexName)) {
+ byte[] cdcKey = SchemaUtil.getTableKey(tenantId, schemaName,
CDCUtil.getCdcObjectName(indexName));
+ Delete deleteCdc = new Delete(cdcKey, clientTimeStamp);
+ catalogMutations.add(deleteCdc);
+ }
byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName,
indexName);
// FIXME: Remove when unintentionally deprecated method is fixed
(HBASE-7870).
// FIXME: the version of the Delete constructor without the lock
args was introduced
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
index ce41e98e85..e2bcf73642 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixMasterObserver.java
@@ -276,33 +276,34 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
qb.append(" AND PARTITION_END_KEY = ? ");
}
- PreparedStatement pstmt = conn.prepareStatement(qb.toString());
- int index = 1;
- pstmt.setString(index++, tableName);
- pstmt.setString(index++, streamName);
- if (parentStartKey.length > 0) pstmt.setBytes(index++, parentStartKey);
- if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey);
- LOGGER.info("Query to get parent partition id: " + pstmt);
-
List<String> ancestorIDs = new ArrayList<>();
- ResultSet rs = pstmt.executeQuery();
List<Long> parentPartitionStartTimes = new ArrayList<>();
- if (rs.next()) {
- ancestorIDs.add(rs.getString(1));
- ancestorIDs.add(rs.getString(2));
- parentPartitionStartTimes.add(rs.getLong(3));
- } else {
- throw new ParentPartitionNotFound(
- String.format("Could not find parent of the provided
daughters: "
- + "startKeyA=%s endKeyA=%s startKeyB=%s
endKeyB=%s",
- Bytes.toStringBinary(regionInfoA.getStartKey()),
- Bytes.toStringBinary(regionInfoA.getEndKey()),
- Bytes.toStringBinary(regionInfoB.getStartKey()),
- Bytes.toStringBinary(regionInfoB.getEndKey())));
- }
- // if parent was a result of a merge, there will be multiple
grandparents.
- while (rs.next()) {
- ancestorIDs.add(rs.getString(2));
+ try (PreparedStatement pstmt = conn.prepareStatement(qb.toString())) {
+ int index = 1;
+ pstmt.setString(index++, tableName);
+ pstmt.setString(index++, streamName);
+ if (parentStartKey.length > 0) pstmt.setBytes(index++,
parentStartKey);
+ if (parentEndKey.length > 0) pstmt.setBytes(index++, parentEndKey);
+ LOGGER.info("Query to get parent partition id: " + pstmt);
+
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next()) {
+ ancestorIDs.add(rs.getString(1));
+ ancestorIDs.add(rs.getString(2));
+ parentPartitionStartTimes.add(rs.getLong(3));
+ } else {
+ throw new ParentPartitionNotFound(
+ String.format("Could not find parent of the provided
daughters: "
+ + "startKeyA=%s endKeyA=%s
startKeyB=%s endKeyB=%s",
+
Bytes.toStringBinary(regionInfoA.getStartKey()),
+ Bytes.toStringBinary(regionInfoA.getEndKey()),
+
Bytes.toStringBinary(regionInfoB.getStartKey()),
+
Bytes.toStringBinary(regionInfoB.getEndKey())));
+ }
+ // if parent was a result of a merge, there will be multiple
grandparents.
+ while (rs.next()) {
+ ancestorIDs.add(rs.getString(2));
+ }
}
return new Pair<>(ancestorIDs, parentPartitionStartTimes);
}
@@ -316,21 +317,22 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
RegionInfo parent) throws
SQLException {
List<String> ancestorIDs = new ArrayList<>();
ancestorIDs.add(parent.getEncodedName());
- PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE);
- pstmt.setString(1, tableName);
- pstmt.setString(2, streamName);
- pstmt.setString(3, parent.getEncodedName());
- ResultSet rs = pstmt.executeQuery();
- if (rs.next()) {
- ancestorIDs.add(rs.getString(1));
- } else {
- throw new ParentPartitionNotFound(String.format(
- "Could not find parent of the provided merged region: %s",
- parent.getEncodedName()));
- }
- // if parent was a result of a merge, there will be multiple
grandparents.
- while (rs.next()) {
- ancestorIDs.add(rs.getString(1));
+ try (PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE)) {
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, streamName);
+ pstmt.setString(3, parent.getEncodedName());
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next()) {
+ ancestorIDs.add(rs.getString(1));
+ } else {
+ throw new ParentPartitionNotFound(String.format(
+ "Could not find parent of the provided merged region:
%s",
+ parent.getEncodedName()));
+ }
+ // if parent was a result of a merge, there will be multiple
grandparents.
+ while (rs.next()) {
+ ancestorIDs.add(rs.getString(1));
+ }
}
return ancestorIDs;
}
@@ -346,27 +348,28 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
List<Long> parentPartitionStartTimes)
throws SQLException {
conn.setAutoCommit(false);
- PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);
- for (RegionInfo daughter : daughters) {
- for (int i=0; i<parentPartitionIDs.size(); i++) {
- String partitionId = daughter.getEncodedName();
- long startTime = daughter.getRegionId();
- byte[] startKey = daughter.getStartKey();
- byte[] endKey = daughter.getEndKey();
- pstmt.setString(1, tableName);
- pstmt.setString(2, streamName);
- pstmt.setString(3, partitionId);
- pstmt.setString(4, parentPartitionIDs.get(i));
- pstmt.setLong(5, startTime);
- // endTime in not set when inserting a new partition
- pstmt.setNull(6, Types.BIGINT);
- pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
- pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
- pstmt.setLong(9, parentPartitionStartTimes.get(i));
- pstmt.executeUpdate();
+ try (PreparedStatement pstmt =
conn.prepareStatement(PARTITION_UPSERT_SQL)) {
+ for (RegionInfo daughter : daughters) {
+ for (int i=0; i<parentPartitionIDs.size(); i++) {
+ String partitionId = daughter.getEncodedName();
+ long startTime = daughter.getRegionId();
+ byte[] startKey = daughter.getStartKey();
+ byte[] endKey = daughter.getEndKey();
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, streamName);
+ pstmt.setString(3, partitionId);
+ pstmt.setString(4, parentPartitionIDs.get(i));
+ pstmt.setLong(5, startTime);
+ // endTime in not set when inserting a new partition
+ pstmt.setNull(6, Types.BIGINT);
+ pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
+ pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
+ pstmt.setLong(9, parentPartitionStartTimes.get(i));
+ pstmt.executeUpdate();
+ }
}
+ conn.commit();
}
- conn.commit();
}
/**
@@ -380,29 +383,31 @@ public class PhoenixMasterObserver implements
MasterObserver, MasterCoprocessor
long daughterStartTime) throws
SQLException {
conn.setAutoCommit(false);
// ancestorIDs = [parentID, grandparentID1, grandparentID2...]
- PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);
- for (int i=1; i<ancestorIDs.size(); i++) {
- pstmt.setString(1, tableName);
- pstmt.setString(2, streamName);
- pstmt.setString(3, ancestorIDs.get(0));
- pstmt.setString(4, ancestorIDs.get(i));
- pstmt.setLong(5, daughterStartTime);
- pstmt.executeUpdate();
+ try (PreparedStatement pstmt =
conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL)) {
+ for (int i=1; i<ancestorIDs.size(); i++) {
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, streamName);
+ pstmt.setString(3, ancestorIDs.get(0));
+ pstmt.setString(4, ancestorIDs.get(i));
+ pstmt.setLong(5, daughterStartTime);
+ pstmt.executeUpdate();
+ }
+ conn.commit();
}
- conn.commit();
}
/**
* Get the stream name on the given table if one exists in ENABLED state.
*/
private String getStreamName(Connection conn, String tableName) throws
SQLException {
- PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY);
- pstmt.setString(1, tableName);
- ResultSet rs = pstmt.executeQuery();
- if (rs.next()) {
- return rs.getString(1);
- } else {
- return null;
+ try (PreparedStatement pstmt =
conn.prepareStatement(STREAM_STATUS_QUERY)) {
+ pstmt.setString(1, tableName);
+ ResultSet rs = pstmt.executeQuery();
+ if (rs.next()) {
+ return rs.getString(1);
+ } else {
+ return null;
+ }
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
index bb3263a93f..42fb437656 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java
@@ -129,7 +129,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
}
@Test
- public void testCreateCaseSensitiveTable() throws Exception {
+ public void testCreateDropCaseSensitiveTable() throws Exception {
Connection conn = newConnection();
String tableName = "\"" + generateUniqueName().toLowerCase() + "\"";
conn.createStatement().execute(
@@ -145,10 +145,21 @@ public class CDCDefinitionIT extends CDCBaseIT {
String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(cdc_sql);
conn.createStatement().executeQuery("SELECT * FROM " + cdcName);
+
+ String drop_sql = forView ? "DROP VIEW " + tableName : "DROP TABLE " +
tableName;
+ conn.createStatement().execute(drop_sql);
+ String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+ try {
+ conn.createStatement().execute(drop_cdc_sql);
+ fail("Expected to fail as cdc table doesn't exist");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(),
e.getErrorCode());
+
assertTrue(e.getMessage().endsWith(SchemaUtil.getUnEscapedFullName(cdcName)));
+ }
}
@Test
- public void testCreateCaseSensitiveSchemaAndTable() throws Exception {
+ public void testCreateDropCaseSensitiveSchemaAndTable() throws Exception {
Connection conn = newConnection();
String schemaName = "\"" + generateUniqueName().toLowerCase() + "\"";
String tableName = SchemaUtil.getTableName(schemaName, "\"" +
generateUniqueName().toLowerCase() + "\"");
@@ -166,6 +177,17 @@ public class CDCDefinitionIT extends CDCBaseIT {
conn.createStatement().execute(cdc_sql);
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
conn.createStatement().executeQuery("SELECT * FROM " + cdcFullName);
+
+ String drop_sql = forView ? "DROP VIEW " + tableName : "DROP TABLE " +
tableName;
+ conn.createStatement().execute(drop_sql);
+ String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+ try {
+ conn.createStatement().execute(drop_cdc_sql);
+ fail("Expected to fail as cdc table doesn't exist");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(),
e.getErrorCode());
+
assertTrue(e.getMessage().endsWith(SchemaUtil.getUnEscapedFullName(cdcName)));
+ }
}
@Test
@@ -250,6 +272,7 @@ public class CDCDefinitionIT extends CDCBaseIT {
assertEquals(indexTable.getEncodingScheme(), NON_ENCODED_QUALIFIERS);
}
+ @Test
public void testDropCDC () throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
@@ -258,6 +281,8 @@ public class CDCDefinitionIT extends CDCBaseIT {
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + "
v1 INTEGER,"
+ " v2 DATE)");
String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(cdc_sql);
String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(drop_cdc_sql);
@@ -303,6 +328,44 @@ public class CDCDefinitionIT extends CDCBaseIT {
}
}
+ @Test
+ public void testDropTable() throws SQLException {
+ Properties props = new Properties();
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String tableName = generateUniqueName();
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + "
v1 INTEGER,"
+ + " v2 DATE)");
+ String cdcName = generateUniqueName();
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(cdc_sql);
+
+ String drop_table_sql = "DROP TABLE " + tableName;
+ conn.createStatement().execute(drop_table_sql);
+
+ // index should have been dropped
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT
index_type FROM " +
+ "system.catalog WHERE table_name = '" +
CDCUtil.getCDCIndexName(cdcName) +
+ "' AND column_name IS NULL and column_family IS NULL")) {
+ assertEquals(false, rs.next());
+ }
+ //cdc object should have been dropped
+ try (ResultSet rs = conn.createStatement().executeQuery("SELECT
cdc_include FROM " +
+ "system.catalog WHERE table_name = '" + cdcName +
+ "' AND column_name IS NULL and column_family IS NULL")) {
+ assertEquals(false, rs.next());
+ }
+
+ String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
+ try {
+ conn.createStatement().execute(drop_cdc_sql);
+ fail("Expected to fail as cdc table doesn't exist");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(),
e.getErrorCode());
+ assertTrue(e.getMessage().endsWith(cdcName));
+ }
+ }
+
@Test
public void testSelectCDCBadIncludeSpec() throws Exception {
Connection conn = newConnection();