This is an automated email from the ASF dual-hosted git repository.
palashc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new be04ae882f PHOENIX-7642 : Add CDC Stream creation datetime to stream
name (#2194)
be04ae882f is described below
commit be04ae882f8d1f4d17c3327a0d66d1cbcd91bab6
Author: Palash Chauhan <[email protected]>
AuthorDate: Tue Jun 17 19:21:28 2025 -0700
PHOENIX-7642 : Add CDC Stream creation datetime to stream name (#2194)
Co-authored-by: Palash Chauhan
<[email protected]>
---
.../java/org/apache/phoenix/schema/MetaDataClient.java | 8 ++++++--
.../src/main/java/org/apache/phoenix/util/CDCUtil.java | 15 +++++++++++++--
.../src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java | 7 ++++---
3 files changed, 23 insertions(+), 7 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 54b3c7f254..96f11462c8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2067,7 +2067,9 @@ public class MetaDataClient {
// create Stream with ENABLING status
long cdcIndexTimestamp =
CDCUtil.getCDCCreationTimestamp(connection.getTable(tableName));
String streamStatusSQL = "UPSERT INTO " +
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
- String streamName = String.format(CDC_STREAM_NAME_FORMAT, tableName,
cdcObjName, cdcIndexTimestamp);
+ String streamName = String.format(CDC_STREAM_NAME_FORMAT,
+ tableName, cdcObjName, cdcIndexTimestamp,
+ CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
try (PreparedStatement ps =
connection.prepareStatement(streamStatusSQL)) {
ps.setString(1, tableName);
ps.setString(2, streamName);
@@ -4020,7 +4022,9 @@ public class MetaDataClient {
// Mark CDC Stream as Disabled
long cdcIndexTimestamp = connection.getTable(indexName).getTimeStamp();
String streamStatusSQL = "UPSERT INTO " +
SYSTEM_CDC_STREAM_STATUS_NAME + " VALUES (?, ?, ?)";
- String streamName = String.format(CDC_STREAM_NAME_FORMAT,
parentTableName, cdcTableName, cdcIndexTimestamp);
+ String streamName = String.format(CDC_STREAM_NAME_FORMAT,
+ parentTableName, cdcTableName, cdcIndexTimestamp,
+ CDCUtil.getCDCCreationUTCDateTime(cdcIndexTimestamp));
try (PreparedStatement ps =
connection.prepareStatement(streamStatusSQL)) {
ps.setString(1, parentTableName);
ps.setString(2, streamName);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
index 4dca1eb03c..3117c5cebc 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCUtil.java
@@ -20,12 +20,16 @@ package org.apache.phoenix.util;
import java.sql.SQLException;
import java.sql.Types;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.Base64;
+import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.StringTokenizer;
+import java.util.TimeZone;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -41,8 +45,8 @@ import org.bson.RawBsonDocument;
public class CDCUtil {
public static final String CDC_INDEX_PREFIX = "PHOENIX_CDC_INDEX_";
- // phoenix/cdc/stream/{tableName}/{cdc object name}/{cdc index timestamp}
- public static String CDC_STREAM_NAME_FORMAT =
"phoenix/cdc/stream/%s/%s/%d";
+ // phoenix/cdc/stream/{tableName}/{cdc object name}/{cdc index
timestamp}/{cdc index creation datetime}
+ public static String CDC_STREAM_NAME_FORMAT =
"phoenix/cdc/stream/%s/%s/%d/%s";
/**
* Make a set of CDC change scope enums from the given string containing
comma separated scope
@@ -180,4 +184,11 @@ public class CDCUtil {
}
return -1;
}
+
+ public static String getCDCCreationUTCDateTime(long timestamp) {
+ Date date = new Date(timestamp);
+ DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ format.setTimeZone(TimeZone.getTimeZone("Etc/UTC"));
+ return format.format(date);
+ }
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 3fb604d8d0..4956245b1c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -1019,9 +1019,10 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
*/
public String getStreamName(Connection conn, String tableName, String
cdcName)
throws SQLException {
- return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
- CDCUtil.getCDCCreationTimestamp(
-
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName)));
+ long creationTS = CDCUtil.getCDCCreationTimestamp(
+
conn.unwrap(PhoenixConnection.class).getTableNoCache(tableName));
+ return String.format(CDC_STREAM_NAME_FORMAT, tableName, cdcName,
creationTS,
+ CDCUtil.getCDCCreationUTCDateTime(creationTS));
}
/**