This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new e4d9819910 PHOENIX-7652 : Clear CDC Stream metadata when table is
dropped (#2208)
e4d9819910 is described below
commit e4d98199104b67ad952d8e10bc906204207472f0
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Jul 3 09:49:21 2025 -0700
PHOENIX-7652 : Clear CDC Stream metadata when table is dropped (#2208)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../org/apache/phoenix/schema/MetaDataClient.java | 56 ++++++++++++++--------
.../main/java/org/apache/phoenix/util/CDCUtil.java | 13 +++++
.../phoenix/coprocessor/MetaDataEndpointImpl.java | 3 ++
.../org/apache/phoenix/end2end/CDCStreamIT.java | 40 ++++++++++++++++
4 files changed, 93 insertions(+), 19 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 96f11462c8..18d4cdb7fc 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -24,8 +24,10 @@ import static
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCH
import static
org.apache.phoenix.exception.SQLExceptionCode.SALTING_NOT_ALLOWED_FOR_CDC;
import static
org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
import static org.apache.phoenix.query.QueryConstants.SPLITS_FILE;
@@ -2066,18 +2068,10 @@ public class MetaDataClient {
private void updateStreamPartitionMetadata(String tableName, String
cdcObjName) throws SQLException {
// create Stream with ENABLING status
long cdcIndexTimestamp =
CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName));
- String streamStatusSQL = "UPSERT INTO " +
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
String streamName = String.format(CDC_STREAM_NAME_FORMAT,
tableName, cdcObjName, cdcIndexTimestamp,
CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
- try (PreparedStatement ps =
connection.prepareStatement(streamStatusSQL)) {
- ps.setString(1, tableName);
- ps.setString(2, streamName);
- ps.setString(3,
CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue());
- ps.executeUpdate();
- connection.commit();
- LOGGER.info("Marked stream {} for table {} as ENABLING",
streamName, tableName);
- }
+ markCDCStreamStatus(tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLING);
// insert task to update partition metadata for stream
try {
@@ -2108,7 +2102,7 @@ public class MetaDataClient {
}
}
- private String getStreamNameIfCDCEnabled(String tableName) throws
SQLException {
+ public String getStreamNameIfCDCEnabled(String tableName) throws
SQLException {
// check if a stream is already enabled for this table
String query = "SELECT STREAM_NAME FROM " +
SYSTEM_CDC_STREAM_STATUS_NAME
+ " WHERE TABLE_NAME = ? AND STREAM_STATUS IN (?, ?)";
@@ -4021,18 +4015,10 @@ public class MetaDataClient {
String indexName =
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
// Mark CDC Stream as Disabled
long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp();
- String streamStatusSQL = "UPSERT INTO " +
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
String streamName = String.format(CDC_STREAM_NAME_FORMAT,
parentTableName, cdcTableName, cdcIndexTimestamp,
CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
- try (PreparedStatement ps =
connection.prepareStatement(streamStatusSQL)) {
- ps.setString(1, parentTableName);
- ps.setString(2, streamName);
- ps.setString(3,
CDCUtil.CdcStreamStatus.DISABLED.getSerializedValue());
- ps.executeUpdate();
- connection.commit();
- LOGGER.info("Marked stream {} for table {} as DISABLED",
streamName, parentTableName);
- }
+ markCDCStreamStatus(parentTableName, streamName,
CDCUtil.CdcStreamStatus.DISABLED);
// Dropping the virtual CDC Table
dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC,
statement.ifExists(),
false, false);
@@ -4047,6 +4033,35 @@ public class MetaDataClient {
}
}
+ private void deleteAllStreamMetadataForTable(String tableName) throws
SQLException {
+ String deleteStreamStatusQuery = "DELETE FROM " +
SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME = ?";
+ String deleteStreamPartitionsQuery = "DELETE FROM " +
SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ?";
+ LOGGER.info("Deleting Stream Metadata for table {}", tableName);
+ try (PreparedStatement ps =
connection.prepareStatement(deleteStreamStatusQuery)) {
+ ps.setString(1, tableName);
+ ps.executeUpdate();
+ connection.commit();
+ }
+ try (PreparedStatement ps =
connection.prepareStatement(deleteStreamPartitionsQuery)) {
+ ps.setString(1, tableName);
+ ps.executeUpdate();
+ connection.commit();
+ }
+ }
+
+ private void markCDCStreamStatus(String tableName, String streamName,
+ CDCUtil.CdcStreamStatus status) throws
SQLException {
+ String streamStatusSQL = "UPSERT INTO " +
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
+ try (PreparedStatement ps =
connection.prepareStatement(streamStatusSQL)) {
+ ps.setString(1, tableName);
+ ps.setString(2, streamName);
+ ps.setString(3, status.getSerializedValue());
+ ps.executeUpdate();
+ connection.commit();
+ LOGGER.info("Marked stream {} for table {} as {}", streamName,
tableName, status);
+ }
+ }
+
private MutationState dropFunction(String functionName,
boolean ifExists) throws SQLException {
connection.rollback();
@@ -4094,6 +4109,9 @@ public class MetaDataClient {
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
try {
PTable ptable = connection.getTable(fullTableName);
+ if (PTableType.TABLE.equals(ptable.getType()) &&
CDCUtil.hasCDCIndex(ptable)) {
+ deleteAllStreamMetadataForTable(fullTableName);
+ }
if (parentTableName != null
&&!parentTableName.equals(ptable.getParentTableName().getString())) {
throw new SQLExceptionInfo.Builder(PARENT_TABLE_NOT_FOUND)
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index b728273cf0..4cdd48cf97 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -112,6 +112,19 @@ public class CDCUtil {
return isCDCIndex(indexTable.getTableName().getString());
}
+ /**
+ * Check if the given table has any CDC indexes.
+ *
+ * @param table The PTable object.
+ * @return true if the table has an CDC index, false otherwise.
+ */
+ public static boolean hasCDCIndex(PTable table) {
+ if (table == null || table.getIndexes() == null) {
+ return false;
+ }
+ return table.getIndexes().stream().anyMatch(CDCUtil::isCDCIndex);
+ }
+
public static Scan setupScanForCDC(Scan scan) {
scan.setRaw(true);
scan.readAllVersions();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 6253b66eb1..5d3fd6e5ba 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3337,6 +3337,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
byte[] cdcKey = SchemaUtil.getTableKey(tenantId, schemaName,
CDCUtil.getCdcObjectName(indexName));
Delete deleteCdc = new Delete(cdcKey, clientTimeStamp);
catalogMutations.add(deleteCdc);
+ // invalidate from cache
+ Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(env).getMetaDataCache();
+ metaDataCache.invalidate(new ImmutableBytesPtr(cdcKey));
}
byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName,
indexName);
// FIXME: Remove when unintentionally deprecated method is fixed
(HBASE-7870).
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 5891de066b..b4d9ba4642 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
@@ -30,9 +31,11 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -41,6 +44,7 @@ import org.junit.experimental.categories.Category;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
import
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -168,6 +172,42 @@ public class CDCStreamIT extends CDCBaseIT {
assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLING);
}
+ @Test
+ public void testStreamMetadataWhenTableIsDropped() throws SQLException {
+ Connection conn = newConnection();
+ MetaDataClient mdc = new
MetaDataClient(conn.unwrap(PhoenixConnection.class));
+ String schemaName = "\"" + generateUniqueName().toLowerCase() + "\"";
+ String tableName = SchemaUtil.getTableName(schemaName, "\"" +
generateUniqueName().toLowerCase() + "\"");
+ String unescapedFullTableName =
SchemaUtil.getUnEscapedFullName(tableName);
+ String create_table_sql = "CREATE TABLE " + tableName + " ( k INTEGER
PRIMARY KEY," + " v1 INTEGER, v2 DATE)";
+ conn.createStatement().execute(create_table_sql);
+ String cdcName = "\"" + generateUniqueName().toLowerCase() + "\"";
+ String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ conn.createStatement().execute(cdc_sql);
+ TaskRegionObserver.SelfHealingTask task =
+ new TaskRegionObserver.SelfHealingTask(
+ taskRegionEnvironment,
+
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+ task.run();
+ String partitionQuery = "SELECT * FROM SYSTEM.CDC_STREAM WHERE
TABLE_NAME='" + unescapedFullTableName + "'";
+ ResultSet rs = conn.createStatement().executeQuery(partitionQuery);
+ Assert.assertTrue(rs.next());
+ String drop_table_sql = "DROP TABLE " + tableName;
+
Assert.assertNotNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+ // check if stream metadata is cleared when table is dropped
+ conn.createStatement().execute(drop_table_sql);
+
Assert.assertNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+ rs = conn.createStatement().executeQuery(partitionQuery);
+ Assert.assertFalse(rs.next());
+ // should be able to re-create same table with same cdc name and
metadata should be populated
+ conn.createStatement().execute(create_table_sql);
+ conn.createStatement().execute(cdc_sql);
+
Assert.assertNotNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+ task.run();
+ rs = conn.createStatement().executeQuery(partitionQuery);
+ Assert.assertTrue(rs.next());
+ }
+
/**
* Split the only region of the table with empty start key and empty end
key.
*/