This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 21109328f72 [Java] Fix CassandraIO ReadFn to quote column names for
reserved keywords (#36459)
21109328f72 is described below
commit 21109328f725bb9136316455db07507144df719f
Author: Suvrat Acharya <[email protected]>
AuthorDate: Wed Dec 3 02:11:02 2025 +0530
[Java] Fix CassandraIO ReadFn to quote column names for reserved keywords
(#36459)
* Fix CassandraIO ReadFn to quote column names for reserved keywords
* Minor fixes
---
.../org/apache/beam/sdk/io/cassandra/ReadFn.java | 10 +
.../beam/sdk/io/cassandra/CassandraIOTest.java | 374 +++++++++++++++++++++
2 files changed, 384 insertions(+)
diff --git
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
index 678c72d42ff..8f16e729bc8 100644
---
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
+++
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java
@@ -50,6 +50,7 @@ class ReadFn<T> extends DoFn<Read<T>, T> {
session.getCluster().getMetadata().getKeyspace(read.keyspace().get())
.getTable(read.table().get()).getPartitionKey().stream()
.map(ColumnMetadata::getName)
+ .map(ReadFn::quoteIdentifier)
.collect(Collectors.joining(","));
String query = generateRangeQuery(read, partitionKey, read.ringRanges()
!= null);
@@ -148,4 +149,13 @@ class ReadFn<T> extends DoFn<Read<T>, T> {
private static String getJoinerClause(String queryString) {
return queryString.toUpperCase().contains("WHERE") ? " AND " : " WHERE ";
}
+
+ static String quoteIdentifier(String identifier) {
+ if (identifier == null) {
+ return null;
+ }
+ // Escape any existing double quotes by doubling them
+ String escaped = identifier.replace("\"", "\"\"");
+ return "\"" + escaped + "\"";
+ }
}
diff --git
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index 747f803ea46..df52421db23 100644
---
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -844,4 +844,378 @@ public class CassandraIOTest implements Serializable {
/** Simple Cassandra entity used in write tests. */
@Table(name = CASSANDRA_TABLE_WRITE, keyspace = CASSANDRA_KEYSPACE)
static class ScientistWrite extends Scientist {}
+
+ /** Test the quoteIdentifier utility method with various inputs. */
+ @Test
+ public void testQuoteIdentifier() {
+ // Test normal identifiers
+ assertEquals("\"normal_column\"", ReadFn.quoteIdentifier("normal_column"));
+ assertEquals("\"myTable\"", ReadFn.quoteIdentifier("myTable"));
+ assertEquals("\"column123\"", ReadFn.quoteIdentifier("column123"));
+
+ // Test reserved keywords
+ assertEquals("\"true\"", ReadFn.quoteIdentifier("true"));
+ assertEquals("\"key\"", ReadFn.quoteIdentifier("key"));
+ assertEquals("\"select\"", ReadFn.quoteIdentifier("select"));
+ assertEquals("\"from\"", ReadFn.quoteIdentifier("from"));
+ assertEquals("\"where\"", ReadFn.quoteIdentifier("where"));
+ assertEquals("\"table\"", ReadFn.quoteIdentifier("table"));
+ assertEquals("\"keyspace\"", ReadFn.quoteIdentifier("keyspace"));
+
+ // Test identifiers with existing quotes (should be escaped by doubling)
+ assertEquals("\"column\"\"with\"\"quotes\"",
ReadFn.quoteIdentifier("column\"with\"quotes"));
+ assertEquals("\"single\"\"quote\"",
ReadFn.quoteIdentifier("single\"quote"));
+ assertEquals("\"\"\"starts_with_quote\"",
ReadFn.quoteIdentifier("\"starts_with_quote"));
+ assertEquals("\"ends_with_quote\"\"\"",
ReadFn.quoteIdentifier("ends_with_quote\""));
+
+ // Test edge cases
+ assertEquals("\"\"", ReadFn.quoteIdentifier(""));
+ assertNull(ReadFn.quoteIdentifier(null));
+
+ // Test special characters that might be in identifiers
+ assertEquals("\"column with spaces\"", ReadFn.quoteIdentifier("column with
spaces"));
+ assertEquals("\"column-with-dashes\"",
ReadFn.quoteIdentifier("column-with-dashes"));
+ assertEquals("\"column.with.dots\"",
ReadFn.quoteIdentifier("column.with.dots"));
+ }
+
+ /**
+ * Test reading from a table with reserved keyword column names. This
integration test verifies
+ * the complete fix works end-to-end.
+ */
+ @Test
+ public void testReadWithReservedKeywordColumns() throws Exception {
+ String reservedTableName = "reserved_keywords_table";
+
+ // Create table with reserved keyword column names
+ String createTableQuery =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS %s.%s("
+ + "\"true\" text, \"key\" text, \"select\" text, normal_column
text, "
+ + "PRIMARY KEY (\"true\", \"key\")"
+ + ");",
+ CASSANDRA_KEYSPACE, reservedTableName);
+
+ session.execute(createTableQuery);
+
+ // Insert test data with reserved keyword column names
+ String insertQuery1 =
+ String.format(
+ "INSERT INTO %s.%s(\"true\", \"key\", \"select\", normal_column) "
+ + "VALUES ('true_value_1', 'key_value_1', 'select_value_1',
'normal_value_1');",
+ CASSANDRA_KEYSPACE, reservedTableName);
+ session.execute(insertQuery1);
+
+ String insertQuery2 =
+ String.format(
+ "INSERT INTO %s.%s(\"true\", \"key\", \"select\", normal_column) "
+ + "VALUES ('true_value_2', 'key_value_2', 'select_value_2',
'normal_value_2');",
+ CASSANDRA_KEYSPACE, reservedTableName);
+ session.execute(insertQuery2);
+
+ // Flush to ensure data is written
+ flushMemTablesAndRefreshSizeEstimates();
+
+ // Test reading with CassandraIO - this should work with the fix
+ PCollection<ReservedKeywordEntity> output =
+ pipeline.apply(
+ CassandraIO.<ReservedKeywordEntity>read()
+ .withHosts(Collections.singletonList(CASSANDRA_HOST))
+ .withPort(cassandraPort)
+ .withKeyspace(CASSANDRA_KEYSPACE)
+ .withTable(reservedTableName)
+ .withCoder(SerializableCoder.of(ReservedKeywordEntity.class))
+ .withEntity(ReservedKeywordEntity.class));
+
+ // Verify we can read the data successfully
+ PAssert.thatSingleton(output.apply("Count",
Count.globally())).isEqualTo(2L);
+
+ PAssert.that(output)
+ .satisfies(
+ input -> {
+ List<ReservedKeywordEntity> entities = new ArrayList<>();
+ input.forEach(entities::add);
+
+ assertEquals(2, entities.size());
+
+ // Check that data was read correctly
+ boolean foundFirst = false, foundSecond = false;
+ for (ReservedKeywordEntity entity : entities) {
+ if ("true_value_1".equals(entity.trueColumn)) {
+ assertEquals("key_value_1", entity.keyColumn);
+ assertEquals("select_value_1", entity.selectColumn);
+ assertEquals("normal_value_1", entity.normalColumn);
+ foundFirst = true;
+ } else if ("true_value_2".equals(entity.trueColumn)) {
+ assertEquals("key_value_2", entity.keyColumn);
+ assertEquals("select_value_2", entity.selectColumn);
+ assertEquals("normal_value_2", entity.normalColumn);
+ foundSecond = true;
+ }
+ }
+
+ assertTrue("Should find first test record", foundFirst);
+ assertTrue("Should find second test record", foundSecond);
+ return null;
+ });
+
+ pipeline.run();
+
+ // Clean up test table
+ session.execute(
+ String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE,
reservedTableName));
+ }
+
+ /** Test reading with a custom query that includes reserved keyword column
names. */
+ @Test
+ public void testReadWithCustomQueryAndReservedKeywords() throws Exception {
+ String customQueryTableName = "custom_query_test";
+
+ // Create table with reserved keyword column names
+ String createTableQuery =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS %s.%s("
+ + "\"from\" text, \"where\" text, data text, "
+ + "PRIMARY KEY (\"from\", \"where\")"
+ + ");",
+ CASSANDRA_KEYSPACE, customQueryTableName);
+
+ session.execute(createTableQuery);
+
+ // Insert test data
+ String insertQuery =
+ String.format(
+ "INSERT INTO %s.%s(\"from\", \"where\", data) "
+ + "VALUES ('source1', 'condition1', 'test_data');",
+ CASSANDRA_KEYSPACE, customQueryTableName);
+ session.execute(insertQuery);
+
+ // Test with custom query that has WHERE clause - this tests the query
building logic
+ String customQuery =
+ String.format(
+ "SELECT \"from\", \"where\", data FROM %s.%s WHERE
\"from\"='source1'",
+ CASSANDRA_KEYSPACE, customQueryTableName);
+
+ PCollection<CustomQueryEntity> output =
+ pipeline.apply(
+ CassandraIO.<CustomQueryEntity>read()
+ .withHosts(Collections.singletonList(CASSANDRA_HOST))
+ .withPort(cassandraPort)
+ .withKeyspace(CASSANDRA_KEYSPACE)
+ .withTable(customQueryTableName)
+ .withQuery(customQuery)
+ .withCoder(SerializableCoder.of(CustomQueryEntity.class))
+ .withEntity(CustomQueryEntity.class));
+
+ PAssert.thatSingleton(output.apply("Count",
Count.globally())).isEqualTo(1L);
+
+ PAssert.that(output)
+ .satisfies(
+ input -> {
+ CustomQueryEntity entity = input.iterator().next();
+ assertEquals("source1", entity.fromColumn);
+ assertEquals("condition1", entity.whereColumn);
+ assertEquals("test_data", entity.data);
+ return null;
+ });
+
+ pipeline.run();
+
+ // Clean up
+ session.execute(
+ String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE,
customQueryTableName));
+ }
+
+ /** Test that the fix handles multiple partition key columns with reserved
keywords. */
+ @Test
+ public void testMultiplePartitionKeyReservedWords() throws Exception {
+ String multiPartitionTableName = "multi_partition_test";
+
+ // Create table with multiple partition key columns that are reserved
keywords
+ String createTableQuery =
+ String.format(
+ "CREATE TABLE IF NOT EXISTS %s.%s("
+ + "\"table\" text, \"index\" text, \"value\" text, data text, "
+ + "PRIMARY KEY ((\"table\", \"index\"), \"value\")"
+ + ");",
+ CASSANDRA_KEYSPACE, multiPartitionTableName);
+
+ session.execute(createTableQuery);
+
+ // Insert test data
+ String insertQuery =
+ String.format(
+ "INSERT INTO %s.%s(\"table\", \"index\", \"value\", data) "
+ + "VALUES ('table1', 'index1', 'value1', 'test_data');",
+ CASSANDRA_KEYSPACE, multiPartitionTableName);
+ session.execute(insertQuery);
+
+ PCollection<MultiPartitionEntity> output =
+ pipeline.apply(
+ CassandraIO.<MultiPartitionEntity>read()
+ .withHosts(Collections.singletonList(CASSANDRA_HOST))
+ .withPort(cassandraPort)
+ .withKeyspace(CASSANDRA_KEYSPACE)
+ .withTable(multiPartitionTableName)
+ .withCoder(SerializableCoder.of(MultiPartitionEntity.class))
+ .withEntity(MultiPartitionEntity.class));
+
+ PAssert.thatSingleton(output.apply("Count",
Count.globally())).isEqualTo(1L);
+
+ PAssert.that(output)
+ .satisfies(
+ input -> {
+ MultiPartitionEntity entity = input.iterator().next();
+ assertEquals("table1", entity.tableColumn);
+ assertEquals("index1", entity.indexColumn);
+ assertEquals("value1", entity.valueColumn);
+ assertEquals("test_data", entity.data);
+ return null;
+ });
+
+ pipeline.run();
+
+ // Clean up
+ session.execute(
+ String.format("DROP TABLE IF EXISTS %s.%s", CASSANDRA_KEYSPACE,
multiPartitionTableName));
+ }
+
+ /** Test that normal (non-reserved) identifiers still work correctly after
the fix. */
+ @Test
+ public void testNormalIdentifiersStillWork() throws Exception {
+ // This test uses the existing CASSANDRA_TABLE which has normal column
names
+ // to ensure our changes don't break existing functionality
+
+ PCollection<Scientist> output =
+ pipeline.apply(
+ CassandraIO.<Scientist>read()
+ .withHosts(Collections.singletonList(CASSANDRA_HOST))
+ .withPort(cassandraPort)
+ .withKeyspace(CASSANDRA_KEYSPACE)
+ .withTable(CASSANDRA_TABLE)
+ .withCoder(SerializableCoder.of(Scientist.class))
+ .withEntity(Scientist.class));
+
+ PAssert.thatSingleton(output.apply("Count",
Count.globally())).isEqualTo(NUM_ROWS);
+
+ pipeline.run();
+ }
+
+ // Add these entity classes after the existing entity classes at the end of
the file
+
+ /** Test entity class for reserved keyword column names to verify identifier
quoting. */
+ @Table(name = "reserved_keywords_table", keyspace = CASSANDRA_KEYSPACE)
+ static class ReservedKeywordEntity implements Serializable {
+
+ @PartitionKey
+ @Column(name = "true") // Reserved keyword as column name
+ String trueColumn;
+
+ @ClusteringColumn
+ @Column(name = "key") // Reserved keyword as column name
+ String keyColumn;
+
+ @Column(name = "select") // Reserved keyword as column name
+ String selectColumn;
+
+ @Column(name = "normal_column") // Normal column name
+ String normalColumn;
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReservedKeywordEntity that = (ReservedKeywordEntity) o;
+ return Objects.equal(trueColumn, that.trueColumn)
+ && Objects.equal(keyColumn, that.keyColumn)
+ && Objects.equal(selectColumn, that.selectColumn)
+ && Objects.equal(normalColumn, that.normalColumn);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(trueColumn, keyColumn, selectColumn,
normalColumn);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ReservedKeywordEntity{true='%s', key='%s', select='%s',
normal='%s'}",
+ trueColumn, keyColumn, selectColumn, normalColumn);
+ }
+ }
+
+ /** Test entity for custom query test with reserved keyword column names. */
+ @Table(name = "custom_query_test", keyspace = CASSANDRA_KEYSPACE)
+ static class CustomQueryEntity implements Serializable {
+ @PartitionKey
+ @Column(name = "from")
+ String fromColumn;
+
+ @ClusteringColumn
+ @Column(name = "where")
+ String whereColumn;
+
+ @Column String data;
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CustomQueryEntity that = (CustomQueryEntity) o;
+ return Objects.equal(fromColumn, that.fromColumn)
+ && Objects.equal(whereColumn, that.whereColumn)
+ && Objects.equal(data, that.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(fromColumn, whereColumn, data);
+ }
+ }
+
+ /** Test entity for multiple partition key test with reserved keywords. */
+ @Table(name = "multi_partition_test", keyspace = CASSANDRA_KEYSPACE)
+ static class MultiPartitionEntity implements Serializable {
+ @PartitionKey(0)
+ @Column(name = "table")
+ String tableColumn;
+
+ @PartitionKey(1)
+ @Column(name = "index")
+ String indexColumn;
+
+ @ClusteringColumn
+ @Column(name = "value")
+ String valueColumn;
+
+ @Column String data;
+
+ @Override
+ public boolean equals(@Nullable Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MultiPartitionEntity that = (MultiPartitionEntity) o;
+ return Objects.equal(tableColumn, that.tableColumn)
+ && Objects.equal(indexColumn, that.indexColumn)
+ && Objects.equal(valueColumn, that.valueColumn)
+ && Objects.equal(data, that.data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(tableColumn, indexColumn, valueColumn, data);
+ }
+ }
}