This is an automated email from the ASF dual-hosted git repository.

palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new b45166dbab PHOENIX-7634 : Change stream name delimiter (#2185)
b45166dbab is described below

commit b45166dbabc7dcdf5504b2fb7ff1fff32f39ac98
Author: Palash Chauhan <[email protected]>
AuthorDate: Wed Jun 11 10:13:13 2025 -0700

    PHOENIX-7634 : Change stream name delimiter (#2185)
    
    Co-authored-by: Palash Chauhan 
<[email protected]>
---
 .../main/java/org/apache/phoenix/util/CDCUtil.java |  4 +-
 .../org/apache/phoenix/end2end/CDCStreamIT.java    | 61 ++++++++++++++++++----
 2 files changed, 53 insertions(+), 12 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 3b3c8b5315..4dca1eb03c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -41,8 +41,8 @@ import org.bson.RawBsonDocument;
 public class CDCUtil {
     public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX_";
 
-    // phoenix-cdc-stream-{tableName}-{cdc object name}-{cdc index timestamp}
-    public static String CDC_STREAM_NAME_FORMAT = 
"phoenix-cdc-stream-%s-%s-%d";
+    // phoenix/cdc/stream/{tableName}/{cdc object name}/{cdc index timestamp}
+    public static String CDC_STREAM_NAME_FORMAT = 
"phoenix/cdc/stream/%s/%s/%d";
 
     /**
      * Make a set of CDC change scope enums from the given string containing 
comma separated scope
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 5061efe31b..6b97d118d6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,6 +18,7 @@
 
 package org.apache.phoenix.end2end;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -53,6 +54,9 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
 import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -61,6 +65,7 @@ import static org.junit.Assert.assertTrue;
 @Category(NeedsOwnMiniClusterTest.class)
 public class CDCStreamIT extends CDCBaseIT {
     private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
@@ -479,6 +484,52 @@ public class CDCStreamIT extends CDCBaseIT {
         }
     }
 
+    @Test
+    public void testGetRecords() throws Exception {
+        Connection conn = newConnection();
+        String tableName = generateUniqueName();
+        createTableAndEnableCDC(conn, tableName);
+
+        // upsert data
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('a', 1, 'foo')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('b', 2, 'bar')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('e', 3, 'alice')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('j', 4, 'bob')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('m', 5, 'cat')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('p', 6, 'cat')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('t', 7, 'cat')");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('z', 8, 'cat')");
+        conn.commit();
+
+        // get stream name
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT STREAM_NAME FROM SYSTEM.CDC_STREAM_STATUS WHERE 
TABLE_NAME='" + tableName + "'");
+        Assert.assertTrue(rs.next());
+        String streamName = rs.getString(1);
+
+        // get partitions
+        rs = conn.createStatement().executeQuery("SELECT PARTITION_ID, 
PARTITION_START_TIME FROM SYSTEM.CDC_STREAM " +
+                "WHERE TABLE_NAME='" + tableName + "' AND STREAM_NAME='" + 
streamName + "'");
+        Assert.assertTrue(rs.next());
+        String partitionId = rs.getString(1);
+        long partitionStartTime = rs.getLong(2);
+
+        //get records
+        String cdcName = streamName.split("/")[4];
+        String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ *  FROM %s WHERE 
PARTITION_ID() = ?  AND PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(? AS BIGINT) AS 
TIMESTAMP) LIMIT ? ";
+        PreparedStatement stmt = conn.prepareStatement(String.format(sql, 
cdcName));
+        stmt.setString(1, partitionId);
+        stmt.setLong(2, partitionStartTime);
+        stmt.setInt(3, 5);
+        rs = stmt.executeQuery();
+        while (rs.next()) {
+            String cdcJson = rs.getString(3);
+            Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson, 
Map.class);
+            Assert.assertTrue(((Map<String, 
Object>)map.get(CDC_PRE_IMAGE)).isEmpty());
+            Assert.assertEquals(CDC_UPSERT_EVENT_TYPE, 
map.get(CDC_EVENT_TYPE));
+        }
+    }
+
     private String getStreamName(Connection conn, String tableName, String 
cdcName) throws SQLException {
         return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName, 
CDCUtil.getCDCCreationTimestamp(
                 
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
@@ -524,16 +575,6 @@ public class CDCStreamIT extends CDCBaseIT {
                         TaskRegionEnvironment, 
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
         task.run();
         assertStreamStatus(conn, tableName, streamName, 
CDCUtil.CdcStreamStatus.ENABLED);
-
-        //upsert sample data
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('a', 1, 'foo')");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('b', 2, 'bar')");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('e', 3, 'alice')");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('j', 4, 'bob')");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('m', 5, 'cat')");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('p', 6, 'cat')");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('t', 7, 'cat')");
-        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES 
('z', 8, 'cat')");
     }
 
     /**

Reply via email to