This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new b45166dbab PHOENIX-7634 : Change stream name delimiter (#2185)
b45166dbab is described below
commit b45166dbabc7dcdf5504b2fb7ff1fff32f39ac98
Author: Palash Chauhan <[email protected]>
AuthorDate: Wed Jun 11 10:13:13 2025 -0700
PHOENIX-7634 : Change stream name delimiter (#2185)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../main/java/org/apache/phoenix/util/CDCUtil.java | 4 +-
.../org/apache/phoenix/end2end/CDCStreamIT.java | 61 ++++++++++++++++++----
2 files changed, 53 insertions(+), 12 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 3b3c8b5315..4dca1eb03c 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -41,8 +41,8 @@ import org.bson.RawBsonDocument;
public class CDCUtil {
public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX_";
- // phoenix-cdc-stream-{tableName}-{cdc object name}-{cdc index timestamp}
- public static String CDC_STREAM_NAME_FORMAT =
"phoenix-cdc-stream-%s-%s-%d";
+ // phoenix/cdc/stream/{tableName}/{cdc object name}/{cdc index timestamp}
+ public static String CDC_STREAM_NAME_FORMAT =
"phoenix/cdc/stream/%s/%s/%d";
/**
* Make a set of CDC change scope enums from the given string containing
comma separated scope
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
index 5061efe31b..6b97d118d6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCStreamIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.end2end;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -53,6 +54,9 @@ import java.util.stream.Collectors;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME;
import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME;
+import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE;
+import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE;
+import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE;
import static org.apache.phoenix.util.CDCUtil.CDC_STREAM_NAME_FORMAT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -61,6 +65,7 @@ import static org.junit.Assert.assertTrue;
@Category(NeedsOwnMiniClusterTest.class)
public class CDCStreamIT extends CDCBaseIT {
private static RegionCoprocessorEnvironment TaskRegionEnvironment;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@BeforeClass
public static synchronized void doSetup() throws Exception {
@@ -479,6 +484,52 @@ public class CDCStreamIT extends CDCBaseIT {
}
}
+ @Test
+ public void testGetRecords() throws Exception {
+ Connection conn = newConnection();
+ String tableName = generateUniqueName();
+ createTableAndEnableCDC(conn, tableName);
+
+ // upsert data
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('a', 1, 'foo')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('b', 2, 'bar')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('e', 3, 'alice')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('j', 4, 'bob')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('m', 5, 'cat')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('p', 6, 'cat')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('t', 7, 'cat')");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('z', 8, 'cat')");
+ conn.commit();
+
+ // get stream name
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT STREAM_NAME FROM SYSTEM.CDC_STREAM_STATUS WHERE
TABLE_NAME='" + tableName + "'");
+ Assert.assertTrue(rs.next());
+ String streamName = rs.getString(1);
+
+ // get partitions
+ rs = conn.createStatement().executeQuery("SELECT PARTITION_ID,
PARTITION_START_TIME FROM SYSTEM.CDC_STREAM " +
+ "WHERE TABLE_NAME='" + tableName + "' AND STREAM_NAME='" +
streamName + "'");
+ Assert.assertTrue(rs.next());
+ String partitionId = rs.getString(1);
+ long partitionStartTime = rs.getLong(2);
+
+ //get records
+ String cdcName = streamName.split("/")[4];
+ String sql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM %s WHERE
PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(? AS BIGINT) AS
TIMESTAMP) LIMIT ? ";
+ PreparedStatement stmt = conn.prepareStatement(String.format(sql,
cdcName));
+ stmt.setString(1, partitionId);
+ stmt.setLong(2, partitionStartTime);
+ stmt.setInt(3, 5);
+ rs = stmt.executeQuery();
+ while (rs.next()) {
+ String cdcJson = rs.getString(3);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcJson,
Map.class);
+ Assert.assertTrue(((Map<String,
Object>)map.get(CDC_PRE_IMAGE)).isEmpty());
+ Assert.assertEquals(CDC_UPSERT_EVENT_TYPE,
map.get(CDC_EVENT_TYPE));
+ }
+ }
+
private String getStreamName(Connection conn, String tableName, String
cdcName) throws SQLException {
return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
CDCUtil.getCDCCreationTimestamp(
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
@@ -524,16 +575,6 @@ public class CDCStreamIT extends CDCBaseIT {
TaskRegionEnvironment,
QueryServicesOptions.DEFAULT_TASK_HANDLING_MAX_INTERVAL_MS);
task.run();
assertStreamStatus(conn, tableName, streamName,
CDCUtil.CdcStreamStatus.ENABLED);
-
- //upsert sample data
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('a', 1, 'foo')");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('b', 2, 'bar')");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('e', 3, 'alice')");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('j', 4, 'bob')");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('m', 5, 'cat')");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('p', 6, 'cat')");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('t', 7, 'cat')");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('z', 8, 'cat')");
}
/**