This is an automated email from the ASF dual-hosted git repository.

saranyak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c3e31eac CASSANALYTICS-120: Quote identifiers option must be set to 
true if ttl has mixed case column name (#172)
c3e31eac is described below

commit c3e31eac9b0a60a569bba47de28beea33e357aec
Author: Saranya Krishnakumar <[email protected]>
AuthorDate: Wed Feb 25 10:12:31 2026 -0800

    CASSANALYTICS-120: Quote identifiers option must be set to true if ttl has 
mixed case column name (#172)
    
    Patch by Saranya Krishnakumar; Reviewed by Francisco Guerrero for 
CASSANALYTICS-120
---
 CHANGES.txt                                        |  1 +
 .../cassandra/spark/bulkwriter/TableSchema.java    | 54 ++++++++++++++++++++--
 .../spark/bulkwriter/TableSchemaTest.java          | 48 +++++++++++++++++++
 .../spark/bulkwriter/TableSchemaTestCommon.java    | 33 +++++++++----
 .../cassandra/analytics/BulkWriteTtlTest.java      | 22 +++++++++
 .../cassandra/analytics/DataGenerationUtils.java   |  7 ++-
 6 files changed, 150 insertions(+), 15 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index caf58b05..7588c8b8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Quote identifiers option must be set to true if ttl has mixed case column 
name (CASSANALYTICS-120)
  * Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing data corruption 
(CASSANALYTICS-116)
  * Fix race condition in DirectStreamSession#onSSTablesProduced and 
SortedSStableWriter#close (CASSANALYTICS-107)
  * Address LZ4 vulnerability (CVE-2025-12183) (CASSANALYTICS-109)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 93f02c14..8f0bbcef 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -73,12 +73,14 @@ public class TableSchema
     {
         this.writeMode = writeMode;
         this.ttlOption = ttlOption;
+
         this.timestampOption = timestampOption;
         this.lowestCassandraVersion = lowestCassandraVersion;
         this.quoteIdentifiers = quoteIdentifiers;
 
         validateDataFrameCompatibility(dfSchema, tableInfo);
         validateNoSecondaryIndexes(tableInfo);
+        validateUserAddedColumns(lowestCassandraVersion, quoteIdentifiers, 
ttlOption, timestampOption);
 
         this.createStatement = getCreateStatement(tableInfo);
         this.modificationStatement = getModificationStatement(dfSchema, 
tableInfo);
@@ -182,6 +184,7 @@ public class TableSchema
                                       TimestampOption timestampOption)
     {
         CassandraBridge bridge = 
CassandraBridgeFactory.get(lowestCassandraVersion);
+
         List<String> columnNames = Arrays.stream(dfSchema.fieldNames())
                                          .filter(fieldName -> 
!fieldName.equals(ttlOption.columnName()))
                                          .filter(fieldName -> 
!fieldName.equals(timestampOption.columnName()))
@@ -260,7 +263,7 @@ public class TableSchema
         Set<String> requiredKeyColumns = new 
LinkedHashSet<>(getRequiredKeyColumns(tableInfo));
         Preconditions.checkArgument(requiredKeyColumns.equals(dfFields),
                                     String.format("Only partition key columns 
(%s) are supported in the input Dataframe"
-                                                + " when 
WRITE_MODE=DELETE_PARTITION but (%s) columns were provided",
+                                                  + " when 
WRITE_MODE=DELETE_PARTITION but (%s) columns were provided",
                                                   String.join(",", 
requiredKeyColumns), String.join(",", dfFields)));
     }
 
@@ -269,10 +272,11 @@ public class TableSchema
         // Make sure all primary key columns are provided
         List<String> requiredKeyColumns = getRequiredKeyColumns(tableInfo);
         Preconditions.checkArgument(dfFields.containsAll(requiredKeyColumns),
-                                    "Missing some required key components in 
DataFrame => " + requiredKeyColumns
-                                            .stream()
-                                            .filter(column -> 
!dfFields.contains(column))
-                                            .collect(Collectors.joining(",")));
+                                    "Missing some required key components in 
DataFrame => "
+                                    + requiredKeyColumns
+                                      .stream()
+                                      .filter(column -> 
!dfFields.contains(column))
+                                      .collect(Collectors.joining(",")));
     }
 
     private static void validateDataframeFieldsInTable(TableInfoProvider 
tableInfo, Set<String> dfFields,
@@ -307,4 +311,44 @@ public class TableSchema
                           .map(dfFieldNames::indexOf)
                           .collect(Collectors.toList());
     }
+
+    private static void validateUserAddedColumns(String 
lowestCassandraVersion, boolean quoteIdentifiers,
+                                                 TTLOption ttlOption, 
TimestampOption timestampOption)
+    {
+        if (!quoteIdentifiers)
+        {
+            CassandraBridge bridge = 
CassandraBridgeFactory.get(lowestCassandraVersion);
+            validateColumnName(bridge, ttlOption.columnName(), 
WriterOptions.TTL.name());
+            validateColumnName(bridge, timestampOption.columnName(), 
WriterOptions.TIMESTAMP.name());
+        }
+    }
+
+    /**
+     * Validates that the provided column name matches what would be produced 
by maybeQuoteIdentifier. If they don't
+     * match, it means the user provided a column name that needs quoting but 
didn't enable QUOTE_IDENTIFIERS option.
+     * We throw early to avoid scenarios such as, mismatches in column names 
leads to bulk write overwriting existing
+     * TTL values to null.
+     *
+     * @param bridge     the Cassandra bridge
+     * @param columnName the column name to validate
+     * @param optionName the option name for error messages
+     * @throws IllegalArgumentException if the column name requires quoting 
but QUOTE_IDENTIFIERS is not enabled
+     */
+    private static void validateColumnName(CassandraBridge bridge, String 
columnName, String optionName)
+    {
+        if (columnName == null || columnName.isEmpty())
+        {
+            return;
+        }
+
+        String quotedName = bridge.maybeQuoteIdentifier(columnName);
+        if (!columnName.equals(quotedName))
+        {
+            throw new IllegalArgumentException(
+            String.format("The %s column name %s requires spark option %s set 
to true for correct conversion. Bulk " +
+                          "write should provide a column name that matches CQL 
requirements, or set %s to true to " +
+                          "enable quoting for all identifiers.", optionName, 
columnName,
+                          WriterOptions.QUOTE_IDENTIFIERS.name(), 
WriterOptions.QUOTE_IDENTIFIERS.name()));
+        }
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index 257034bf..266caed5 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -270,6 +270,54 @@ public class TableSchemaTest
                 .hasMessage("Bulkwriter doesn't support secondary indexes");
     }
 
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+    public void testMixedCaseTTLColumnNameWithoutQuoteIdentifiersFails(String 
cassandraVersion)
+    {
+        assertThatThrownBy(() -> getValidSchemaBuilder(cassandraVersion)
+                .withTTLSetting(TTLOption.from("updatedTTL"))
+                .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("The TTL column name updatedTTL requires 
spark option QUOTE_IDENTIFIERS set to true for correct conversion");
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+    public void 
testMixedCaseTimestampColumnNameWithoutQuoteIdentifiersFails(String 
cassandraVersion)
+    {
+        assertThatThrownBy(() -> getValidSchemaBuilder(cassandraVersion)
+                .withTimeStampSetting(TimestampOption.from("updatedTimestamp"))
+                .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("The TIMESTAMP column name 
updatedTimestamp requires spark option QUOTE_IDENTIFIERS set to true for 
correct conversion");
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+    public void testMixedCaseTTLColumnNameWithQuoteIdentifiersSucceeds(String 
cassandraVersion)
+    {
+        TableSchema schema = getValidSchemaBuilder(cassandraVersion)
+                .withTTLSetting(TTLOption.from("updatedTTL"))
+                .withQuotedIdentifiers()
+                .build();
+        assertThat(schema).isNotNull();
+        assertThat(trimUniqueTableName(schema.modificationStatement))
+                .contains("USING TTL :\"updatedTTL\"");
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+    public void 
testMixedCaseTimestampColumnNameWithQuoteIdentifiersSucceeds(String 
cassandraVersion)
+    {
+        TableSchema schema = getValidSchemaBuilder(cassandraVersion)
+                .withTimeStampSetting(TimestampOption.from("updatedTimestamp"))
+                .withQuotedIdentifiers()
+                .build();
+        assertThat(schema).isNotNull();
+        assertThat(trimUniqueTableName(schema.modificationStatement))
+                .contains("USING TIMESTAMP :\"updatedTimestamp\"");
+    }
+
     private TableSchemaTestCommon.MockTableSchemaBuilder 
getValidSchemaBuilder(String cassandraVersion)
     {
         return new 
TableSchemaTestCommon.MockTableSchemaBuilder(CassandraBridgeFactory.get(cassandraVersion))
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index 32fddffc..cff2e6f4 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -270,6 +270,16 @@ public final class TableSchemaTestCommon
             return this;
         }
 
+        private ImmutableMap<String, CqlField.CqlType> 
addColumnToCqlColumns(ImmutableMap<String, CqlField.CqlType> currentColumns,
+                                                                             
String columnName,
+                                                                             
String cqlType)
+        {
+            ImmutableMap.Builder<String, CqlField.CqlType> builder = 
ImmutableMap.builder();
+            builder.putAll(currentColumns);
+            builder.put(columnName, mockCqlType(cqlType));
+            return builder.build();
+        }
+
         public TableSchema build()
         {
             Objects.requireNonNull(cqlColumns,
@@ -286,21 +296,26 @@ public final class TableSchemaTestCommon
                                    "writeMode cannot be null. Please provide 
the write mode by calling #withWriteMode");
             Objects.requireNonNull(dataFrameSchema,
                                    "dataFrameSchema cannot be null. Please 
provide the write mode by calling #withDataFrameSchema");
-            MockTableInfoProvider tableInfoProvider = new 
MockTableInfoProvider(bridge,
-                                                                               
 cqlColumns,
-                                                                               
 partitionKeyColumns,
-                                                                               
 partitionKeyColumnTypes,
-                                                                               
 primaryKeyColumnNames,
-                                                                               
 cassandraVersion,
-                                                                               
 quoteIdentifiers);
+
+            ImmutableMap<String, CqlField.CqlType> updatedCqlColumns = 
cqlColumns;
             if (ttlOption.withTTl() && ttlOption.columnName() != null)
             {
-                dataFrameSchema = dataFrameSchema.add("ttl", 
DataTypes.IntegerType);
+                dataFrameSchema = dataFrameSchema.add(ttlOption.columnName(), 
DataTypes.IntegerType);
+                updatedCqlColumns = addColumnToCqlColumns(updatedCqlColumns, 
ttlOption.columnName(), SqlToCqlTypeConverter.INT);
             }
             if (timestampOption.withTimestamp() && 
timestampOption.columnName() != null)
             {
-                dataFrameSchema = dataFrameSchema.add("timestamp", 
DataTypes.IntegerType);
+                dataFrameSchema = 
dataFrameSchema.add(timestampOption.columnName(), DataTypes.LongType);
+                updatedCqlColumns = addColumnToCqlColumns(updatedCqlColumns, 
timestampOption.columnName(), SqlToCqlTypeConverter.BIGINT);
             }
+
+            MockTableInfoProvider tableInfoProvider = new 
MockTableInfoProvider(bridge,
+                                                                               
 updatedCqlColumns,
+                                                                               
 partitionKeyColumns,
+                                                                               
 partitionKeyColumnTypes,
+                                                                               
 primaryKeyColumnNames,
+                                                                               
 cassandraVersion,
+                                                                               
 quoteIdentifiers);
             return new TableSchema(dataFrameSchema, tableInfoProvider, 
writeMode, ttlOption, timestampOption, cassandraVersion, quoteIdentifiers);
         }
     }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
index d5d198eb..bed89e26 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
@@ -44,6 +44,7 @@ class BulkWriteTtlTest extends 
SharedClusterSparkIntegrationTestBase
     static final QualifiedName DEFAULT_TTL_NAME = new 
QualifiedName(TEST_KEYSPACE, "test_default_ttl");
     static final QualifiedName CONSTANT_TTL_NAME = new 
QualifiedName(TEST_KEYSPACE, "test_ttl_constant");
     static final QualifiedName PER_ROW_TTL_NAME = new 
QualifiedName(TEST_KEYSPACE, "test_ttl_per_row");
+    static final QualifiedName MIXED_CASE_TTL_NAME = new 
QualifiedName(TEST_KEYSPACE, "test_ttl_mixed_case");
 
     @Test
     void testTableDefaultTtl()
@@ -87,6 +88,21 @@ class BulkWriteTtlTest extends 
SharedClusterSparkIntegrationTestBase
         assertThat(result.hasNext()).isFalse();
     }
 
+    @Test
+    void testTtlOptionPerRowWithMixedCaseColumnName()
+    {
+        SparkSession spark = getOrCreateSparkSession();
+        Dataset<Row> df = DataGenerationUtils.generateCourseData(spark, 1, 
"updatedTTL", null, ROW_COUNT);
+
+        bulkWriterDataFrameWriter(df, 
MIXED_CASE_TTL_NAME).option(WriterOptions.TTL.name(), 
TTLOption.perRow("updatedTTL"))
+                                                          
.option(WriterOptions.QUOTE_IDENTIFIERS.name(), true)
+                                                          .save();
+        // Wait to make sure TTLs have expired
+        Uninterruptibles.sleepUninterruptibly(1100, TimeUnit.MILLISECONDS);
+        SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult("SELECT * FROM " + 
MIXED_CASE_TTL_NAME, ConsistencyLevel.ALL);
+        assertThat(result.hasNext()).isFalse();
+    }
+
     @Override
     protected ClusterBuilderConfiguration testClusterConfiguration()
     {
@@ -117,5 +133,11 @@ class BulkWriteTtlTest extends 
SharedClusterSparkIntegrationTestBase
                                                      + "          marks 
BIGINT\n"
                                                      + "     );"
         );
+        cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " + 
MIXED_CASE_TTL_NAME + " (\n"
+                                                     + "          id BIGINT 
PRIMARY KEY,\n"
+                                                     + "          course 
TEXT,\n"
+                                                     + "          marks 
BIGINT\n"
+                                                     + "     );"
+        );
     }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java
index 0503c034..8c4623c9 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java
@@ -153,6 +153,11 @@ public final class DataGenerationUtils
     }
 
     public static Dataset<Row> generateCourseData(SparkSession spark, Integer 
ttl, Long timestamp, int rowCount)
+    {
+        return generateCourseData(spark, ttl, "ttl", timestamp, rowCount);
+    }
+
+    public static Dataset<Row> generateCourseData(SparkSession spark, Integer 
ttl, String ttlColumnName, Long timestamp, int rowCount)
     {
         SQLContext sql = spark.sqlContext();
         // Note: only primary key columns are required to be not nullable; All 
columns are nullable only for test convenience
@@ -163,7 +168,7 @@ public final class DataGenerationUtils
 
         if (ttl != null)
         {
-            schema = schema.add("ttl", IntegerType, false);
+            schema = schema.add(ttlColumnName, IntegerType, false);
         }
         if (timestamp != null)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to