This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new d3c5d98c49 PHOENIX-7636 CDC on table with case-sensitive pk columns
fails to read change records (#2186)
d3c5d98c49 is described below
commit d3c5d98c49ec6c7ec4b8c22482673057cc49d487
Author: Viraj Jasani <[email protected]>
AuthorDate: Wed Jun 11 17:16:49 2025 -0700
PHOENIX-7636 CDC on table with case-sensitive pk columns fails to read
change records (#2186)
---
.../org/apache/phoenix/schema/MetaDataClient.java | 11 ++--
.../java/org/apache/phoenix/end2end/Bson3IT.java | 73 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 4 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index a1cbfa10b4..54b3c7f254 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -2022,11 +2022,14 @@ public class MetaDataClient {
int pkOffset = dataTable.getBucketNum() != null ? 1 : 0;
for (int i = pkOffset; i < pkColumns.size(); ++i) {
PColumn pcol = pkColumns.get(i);
-
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(pcol.getName().getString()),
- pcol.getDataType().getSqlTypeName(), false, null, false,
pcol.getMaxLength(),
- pcol.getScale(), false, pcol.getSortOrder(), "", null,
false));
+ columnDefs.add(
+ FACTORY.columnDef(FACTORY.columnName("\"" +
pcol.getName().getString() + "\""),
+ pcol.getDataType().getSqlTypeName(), false, null,
false,
+ pcol.getMaxLength(), pcol.getScale(), false,
+ pcol.getSortOrder(), "", null, false));
pkColumnDefs.add(FACTORY.columnDefInPkConstraint(FACTORY.columnName(
- pcol.getName().getString()), pcol.getSortOrder(),
pcol.isRowTimestamp()));
+ "\"" + pcol.getName().getString() + "\""),
pcol.getSortOrder(),
+ pcol.isRowTimestamp()));
}
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME),
PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null,
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
index 8a2070e183..1adad43dc3 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson3IT.java
@@ -32,6 +32,7 @@ import org.bson.BsonBinary;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
+import org.bson.BsonInt32;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
@@ -57,6 +58,7 @@ import java.util.Properties;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -1860,4 +1862,75 @@ public class Bson3IT extends ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testCDCWithCaseSenstitiveTableAndPks() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = "XYZ.\"test.table\"";
+ String cdcName = "XYZ.\"CDC_test.table\"";
+ String cdcNameWithoutSchema = "\"CDC_test.table\"";
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "CREATE TABLE " + tableName +
+ " (\"hk\" VARCHAR NOT NULL, COL BSON CONSTRAINT pk PRIMARY
KEY(\"hk\"))";
+ conn.createStatement().execute(ddl);
+
+ String cdcDdl = "CREATE CDC " + cdcNameWithoutSchema + " ON " +
tableName;
+ conn.createStatement().execute(cdcDdl);
+
+ String alterDdl = "ALTER TABLE " + tableName
+ + " SET SCHEMA_VERSION = 'NEW_AND_OLD_IMAGES'";
+ conn.createStatement().execute(alterDdl);
+
+ Timestamp ts1 = new Timestamp(System.currentTimeMillis());
+ Thread.sleep(100);
+
+ BsonDocument bsonDocument = new BsonDocument()
+ .append("field1", new BsonString("value1"))
+ .append("field2", new BsonInt32(42))
+ .append("field3", new BsonBoolean(true));
+
+ PreparedStatement stmt = conn.prepareStatement(
+ "UPSERT INTO " + tableName + " VALUES (?,?)");
+ stmt.setString(1, "key1");
+ stmt.setObject(2, bsonDocument);
+ stmt.executeUpdate();
+ conn.commit();
+
+ Thread.sleep(100);
+ Timestamp ts2 = new Timestamp(System.currentTimeMillis());
+
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT DISTINCT PARTITION_ID() FROM " + cdcName);
+ assertTrue("Expected one partition", rs.next());
+ String partitionId = rs.getString(1);
+ assertFalse("Expected only one partition", rs.next());
+
+ String cdcQuery = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " +
cdcName
+ + " WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() >= ?
AND "
+ + "PHOENIX_ROW_TIMESTAMP() <= ?";
+ PreparedStatement ps = conn.prepareStatement(cdcQuery);
+ ps.setString(1, partitionId);
+ ps.setTimestamp(2, ts1);
+ ps.setTimestamp(3, ts2);
+
+ rs = ps.executeQuery();
+
+ assertTrue("Expected at least one CDC record", rs.next());
+
+ String cdcVal = rs.getString(3);
+ Map<String, Object> map = OBJECT_MAPPER.readValue(cdcVal, Map.class);
+
+ Map<String, Object> preImage = (Map<String, Object>)
map.get(QueryConstants.CDC_PRE_IMAGE);
+ assertNull("Pre-image should be null for first insert",
preImage.get("COL"));
+
+ Map<String, Object> postImage = (Map<String, Object>)
map.get(QueryConstants.CDC_POST_IMAGE);
+ String encodedBytes = (String) postImage.get("COL");
+ byte[] bytes = Base64.getDecoder().decode(encodedBytes);
+ RawBsonDocument actualDoc = new RawBsonDocument(bytes, 0, bytes.length);
+ assertEquals("Post-image BSON document should match inserted document",
bsonDocument,
+ actualDoc);
+
+ assertFalse("Should only have one CDC record", rs.next());
+ }
+ }
+
}