This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new e4d9819910 PHOENIX-7652 : Clear CDC Stream metadata when table is 
dropped (#2208)
e4d9819910 is described below

commit e4d98199104b67ad952d8e10bc906204207472f0
Author: Palash Chauhan <[email protected]>
AuthorDate: Thu Jul 3 09:49:21 2025 -0700

    PHOENIX-7652 : Clear CDC Stream metadata when table is dropped (#2208)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../org/apache/phoenix/schema/MetaDataClient.java  | 56 ++++++++++++++--------
 .../main/java/org/apache/phoenix/util/CDCUtil.java | 13 +++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  3 ++
 .../org/apache/phoenix/end2end/CDCStreamIT.java    | 40 ++++++++++++++++
 4 files changed, 93 insertions(+), 19 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 96f11462c8..18d4cdb7fc 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -24,8 +24,10 @@ import static 
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCH
 import static 
org.apache.phoenix.exception.SQLExceptionCode.SALTING_NOT_ALLOWED_FOR_CDC;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_TABLE;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static org.apache.phoenix.query.QueryConstants.SPLITS_FILE;
@@ -2066,18 +2068,10 @@ public class MetaDataClient {
     private void updateStreamPartitionMetadata(String tableName, String 
cdcObjName) throws SQLException {
         // create Stream with ENABLING status
         long cdcIndexTimestamp = 
CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName));
-        String streamStatusSQL = "UPSERT INTO " + 
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
         String streamName = String.format(CDC_STREAM_NAME_FORMAT,
                 tableName, cdcObjName, cdcIndexTimestamp,
                 CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
-        try (PreparedStatement ps = 
connection.prepareStatement(streamStatusSQL)) {
-            ps.setString(1, tableName);
-            ps.setString(2, streamName);
-            ps.setString(3, 
CDCUtil.CdcStreamStatus.ENABLING.getSerializedValue());
-            ps.executeUpdate();
-            connection.commit();
-            LOGGER.info("Marked stream {} for table {} as ENABLING", 
streamName, tableName);
-        }
+        markCDCStreamStatus(tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLING);
 
         // insert task to update partition metadata for stream
         try {
@@ -2108,7 +2102,7 @@ public class MetaDataClient {
         }
     }
 
-    private String getStreamNameIfCDCEnabled(String tableName) throws 
SQLException {
+    public String getStreamNameIfCDCEnabled(String tableName) throws 
SQLException {
         // check if a stream is already enabled for this table
         String query = "SELECT STREAM_NAME FROM " + 
SYSTEM_CDC_STREAM_STATUS_NAME
                 + " WHERE TABLE_NAME = ? AND STREAM_STATUS IN (?, ?)";
@@ -4021,18 +4015,10 @@ public class MetaDataClient {
         String indexName = 
CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
         // Mark CDC Stream as Disabled
         long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp();
-        String streamStatusSQL = "UPSERT INTO " + 
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
         String streamName = String.format(CDC_STREAM_NAME_FORMAT,
                 parentTableName, cdcTableName, cdcIndexTimestamp,
                 CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
-        try (PreparedStatement ps = 
connection.prepareStatement(streamStatusSQL)) {
-            ps.setString(1, parentTableName);
-            ps.setString(2, streamName);
-            ps.setString(3, 
CDCUtil.CdcStreamStatus.DISABLED.getSerializedValue());
-            ps.executeUpdate();
-            connection.commit();
-            LOGGER.info("Marked stream {} for table {} as DISABLED", 
streamName, parentTableName);
-        }
+        markCDCStreamStatus(parentTableName, streamName, 
CDCUtil.CdcStreamStatus.DISABLED);
         // Dropping the virtual CDC Table
         dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC, 
statement.ifExists(),
                 false, false);
@@ -4047,6 +4033,35 @@ public class MetaDataClient {
         }
     }
 
+    private void deleteAllStreamMetadataForTable(String tableName) throws 
SQLException {
+        String deleteStreamStatusQuery = "DELETE FROM " + 
SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME = ?";
+        String deleteStreamPartitionsQuery = "DELETE FROM " + 
SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ?";
+        LOGGER.info("Deleting Stream Metadata for table {}", tableName);
+        try (PreparedStatement ps = 
connection.prepareStatement(deleteStreamStatusQuery)) {
+            ps.setString(1, tableName);
+            ps.executeUpdate();
+            connection.commit();
+        }
+        try (PreparedStatement ps = 
connection.prepareStatement(deleteStreamPartitionsQuery)) {
+            ps.setString(1, tableName);
+            ps.executeUpdate();
+            connection.commit();
+        }
+    }
+
+    private void markCDCStreamStatus(String tableName, String streamName,
+                                     CDCUtil.CdcStreamStatus status) throws 
SQLException {
+        String streamStatusSQL = "UPSERT INTO " + 
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
+        try (PreparedStatement ps = 
connection.prepareStatement(streamStatusSQL)) {
+            ps.setString(1, tableName);
+            ps.setString(2, streamName);
+            ps.setString(3, status.getSerializedValue());
+            ps.executeUpdate();
+            connection.commit();
+            LOGGER.info("Marked stream {} for table {} as {}", streamName, 
tableName, status);
+        }
+    }
+
     private MutationState dropFunction(String functionName,
             boolean ifExists) throws SQLException {
         connection.rollback();
@@ -4094,6 +4109,9 @@ public class MetaDataClient {
         String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         try {
             PTable ptable = connection.getTable(fullTableName);
+            if (PTableType.TABLE.equals(ptable.getType()) && 
CDCUtil.hasCDCIndex(ptable)) {
+                deleteAllStreamMetadataForTable(fullTableName);
+            }
             if (parentTableName != null 
&&!parentTableName.equals(ptable.getParentTableName().getString())) {
                 throw new SQLExceptionInfo.Builder(PARENT_TABLE_NOT_FOUND)
                         
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index b728273cf0..4cdd48cf97 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -112,6 +112,19 @@ public class CDCUtil {
         return isCDCIndex(indexTable.getTableName().getString());
     }
 
+    /**
+     * Check if the given table has any CDC indexes.
+     *
+     * @param table The PTable object.
+     * @return true if the table has an CDC index, false otherwise.
+     */
+    public static boolean hasCDCIndex(PTable table) {
+        if (table == null || table.getIndexes() == null) {
+            return false;
+        }
+        return table.getIndexes().stream().anyMatch(CDCUtil::isCDCIndex);
+    }
+
     public static Scan setupScanForCDC(Scan scan) {
         scan.setRaw(true);
         scan.readAllVersions();
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 6253b66eb1..5d3fd6e5ba 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3337,6 +3337,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                 byte[] cdcKey = SchemaUtil.getTableKey(tenantId, schemaName, 
CDCUtil.getCdcObjectName(indexName));
                 Delete deleteCdc = new Delete(cdcKey, clientTimeStamp);
                 catalogMutations.add(deleteCdc);
+                // invalidate from cache
+                Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache = 
GlobalCache.getInstance(env).getMetaDataCache();
+                metaDataCache.invalidate(new ImmutableBytesPtr(cdcKey));
             }
             byte[] indexKey = SchemaUtil.getTableKey(tenantId, schemaName, 
indexName);
             // FIXME: Remove when unintentionally deprecated method is fixed 
(HBASE-7870).
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 5891de066b..b4d9ba4642 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.end2end;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.PhoenixMasterObserver;
@@ -30,9 +31,11 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.CDCUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -41,6 +44,7 @@ import org.junit.experimental.categories.Category;
 import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
 import 
org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -168,6 +172,42 @@ public class CDCStreamIT extends CDCBaseIT {
         assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLING);
     }
 
+    @Test
+    public void testStreamMetadataWhenTableIsDropped() throws SQLException {
+        Connection conn = newConnection();
+        MetaDataClient mdc = new 
MetaDataClient(conn.unwrap(PhoenixConnection.class));
+        String schemaName = "\"" + generateUniqueName().toLowerCase() + "\"";
+        String tableName = SchemaUtil.getTableName(schemaName, "\"" + 
generateUniqueName().toLowerCase() + "\"");
+        String unescapedFullTableName = 
SchemaUtil.getUnEscapedFullName(tableName);
+        String create_table_sql = "CREATE TABLE  " + tableName + " ( k INTEGER 
PRIMARY KEY," + " v1 INTEGER, v2 DATE)";
+        conn.createStatement().execute(create_table_sql);
+        String cdcName = "\"" + generateUniqueName().toLowerCase() + "\"";
+        String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+        conn.createStatement().execute(cdc_sql);
+        TaskRegionObserver.SelfHealingTask task =
+                new TaskRegionObserver.SelfHealingTask(
+                        taskRegionEnvironment,
+                        
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
+        task.run();
+        String partitionQuery = "SELECT * FROM SYSTEM.CDC_STREAM WHERE 
TABLE_NAME='" + unescapedFullTableName + "'";
+        ResultSet rs = conn.createStatement().executeQuery(partitionQuery);
+        Assert.assertTrue(rs.next());
+        String drop_table_sql = "DROP TABLE " + tableName;
+        
Assert.assertNotNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+        // check if stream metadata is cleared when table is dropped
+        conn.createStatement().execute(drop_table_sql);
+        
Assert.assertNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+        rs = conn.createStatement().executeQuery(partitionQuery);
+        Assert.assertFalse(rs.next());
+        // should be able to re-create same table with same cdc name and 
metadata should be populated
+        conn.createStatement().execute(create_table_sql);
+        conn.createStatement().execute(cdc_sql);
+        
Assert.assertNotNull(mdc.getStreamNameIfCDCEnabled(unescapedFullTableName));
+        task.run();
+        rs = conn.createStatement().executeQuery(partitionQuery);
+        Assert.assertTrue(rs.next());
+    }
+
     /**
      * Split the only region of the table with empty start key and empty end 
key.
      */

Reply via email to