This is an automated email from the ASF dual-hosted git repository.
saranyak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new c3e31eac CASSANALYTICS-120: Quote identifiers option must be set to
true if ttl has mixed case column name (#172)
c3e31eac is described below
commit c3e31eac9b0a60a569bba47de28beea33e357aec
Author: Saranya Krishnakumar <[email protected]>
AuthorDate: Wed Feb 25 10:12:31 2026 -0800
CASSANALYTICS-120: Quote identifiers option must be set to true if ttl has
mixed case column name (#172)
Patch by Saranya Krishnakumar; Reviewed by Francisco Guerrero for
CASSANALYTICS-120
---
CHANGES.txt | 1 +
.../cassandra/spark/bulkwriter/TableSchema.java | 54 ++++++++++++++++++++--
.../spark/bulkwriter/TableSchemaTest.java | 48 +++++++++++++++++++
.../spark/bulkwriter/TableSchemaTestCommon.java | 33 +++++++++----
.../cassandra/analytics/BulkWriteTtlTest.java | 22 +++++++++
.../cassandra/analytics/DataGenerationUtils.java | 7 ++-
6 files changed, 150 insertions(+), 15 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index caf58b05..7588c8b8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
0.3.0
-----
+ * Quote identifiers option must be set to true if ttl has mixed case column
name (CASSANALYTICS-120)
* Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing data corruption
(CASSANALYTICS-116)
* Fix race condition in DirectStreamSession#onSSTablesProduced and
SortedSStableWriter#close (CASSANALYTICS-107)
* Address LZ4 vulnerability (CVE-2025-12183) (CASSANALYTICS-109)
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 93f02c14..8f0bbcef 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -73,12 +73,14 @@ public class TableSchema
{
this.writeMode = writeMode;
this.ttlOption = ttlOption;
+
this.timestampOption = timestampOption;
this.lowestCassandraVersion = lowestCassandraVersion;
this.quoteIdentifiers = quoteIdentifiers;
validateDataFrameCompatibility(dfSchema, tableInfo);
validateNoSecondaryIndexes(tableInfo);
+ validateUserAddedColumns(lowestCassandraVersion, quoteIdentifiers,
ttlOption, timestampOption);
this.createStatement = getCreateStatement(tableInfo);
this.modificationStatement = getModificationStatement(dfSchema,
tableInfo);
@@ -182,6 +184,7 @@ public class TableSchema
TimestampOption timestampOption)
{
CassandraBridge bridge =
CassandraBridgeFactory.get(lowestCassandraVersion);
+
List<String> columnNames = Arrays.stream(dfSchema.fieldNames())
.filter(fieldName ->
!fieldName.equals(ttlOption.columnName()))
.filter(fieldName ->
!fieldName.equals(timestampOption.columnName()))
@@ -260,7 +263,7 @@ public class TableSchema
Set<String> requiredKeyColumns = new
LinkedHashSet<>(getRequiredKeyColumns(tableInfo));
Preconditions.checkArgument(requiredKeyColumns.equals(dfFields),
String.format("Only partition key columns
(%s) are supported in the input Dataframe"
- + " when
WRITE_MODE=DELETE_PARTITION but (%s) columns were provided",
+ + " when
WRITE_MODE=DELETE_PARTITION but (%s) columns were provided",
String.join(",",
requiredKeyColumns), String.join(",", dfFields)));
}
@@ -269,10 +272,11 @@ public class TableSchema
// Make sure all primary key columns are provided
List<String> requiredKeyColumns = getRequiredKeyColumns(tableInfo);
Preconditions.checkArgument(dfFields.containsAll(requiredKeyColumns),
- "Missing some required key components in
DataFrame => " + requiredKeyColumns
- .stream()
- .filter(column ->
!dfFields.contains(column))
- .collect(Collectors.joining(",")));
+ "Missing some required key components in
DataFrame => "
+ + requiredKeyColumns
+ .stream()
+ .filter(column ->
!dfFields.contains(column))
+ .collect(Collectors.joining(",")));
}
private static void validateDataframeFieldsInTable(TableInfoProvider
tableInfo, Set<String> dfFields,
@@ -307,4 +311,44 @@ public class TableSchema
.map(dfFieldNames::indexOf)
.collect(Collectors.toList());
}
+
+ private static void validateUserAddedColumns(String
lowestCassandraVersion, boolean quoteIdentifiers,
+ TTLOption ttlOption,
TimestampOption timestampOption)
+ {
+ if (!quoteIdentifiers)
+ {
+ CassandraBridge bridge =
CassandraBridgeFactory.get(lowestCassandraVersion);
+ validateColumnName(bridge, ttlOption.columnName(),
WriterOptions.TTL.name());
+ validateColumnName(bridge, timestampOption.columnName(),
WriterOptions.TIMESTAMP.name());
+ }
+ }
+
+ /**
+ * Validates that the provided column name matches what would be produced
by maybeQuoteIdentifier. If they don't
+ * match, it means the user provided a column name that needs quoting but
didn't enable QUOTE_IDENTIFIERS option.
+ * We throw early to avoid scenarios such as, mismatches in column names
leads to bulk write overwriting existing
+ * TTL values to null.
+ *
+ * @param bridge the Cassandra bridge
+ * @param columnName the column name to validate
+ * @param optionName the option name for error messages
+ * @throws IllegalArgumentException if the column name requires quoting
but QUOTE_IDENTIFIERS is not enabled
+ */
+ private static void validateColumnName(CassandraBridge bridge, String
columnName, String optionName)
+ {
+ if (columnName == null || columnName.isEmpty())
+ {
+ return;
+ }
+
+ String quotedName = bridge.maybeQuoteIdentifier(columnName);
+ if (!columnName.equals(quotedName))
+ {
+ throw new IllegalArgumentException(
+ String.format("The %s column name %s requires spark option %s set
to true for correct conversion. Bulk " +
+ "write should provide a column name that matches CQL
requirements, or set %s to true to " +
+ "enable quoting for all identifiers.", optionName,
columnName,
+ WriterOptions.QUOTE_IDENTIFIERS.name(),
WriterOptions.QUOTE_IDENTIFIERS.name()));
+ }
+ }
}
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index 257034bf..266caed5 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -270,6 +270,54 @@ public class TableSchemaTest
.hasMessage("Bulkwriter doesn't support secondary indexes");
}
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+ public void testMixedCaseTTLColumnNameWithoutQuoteIdentifiersFails(String
cassandraVersion)
+ {
+ assertThatThrownBy(() -> getValidSchemaBuilder(cassandraVersion)
+ .withTTLSetting(TTLOption.from("updatedTTL"))
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("The TTL column name updatedTTL requires
spark option QUOTE_IDENTIFIERS set to true for correct conversion");
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+ public void
testMixedCaseTimestampColumnNameWithoutQuoteIdentifiersFails(String
cassandraVersion)
+ {
+ assertThatThrownBy(() -> getValidSchemaBuilder(cassandraVersion)
+ .withTimeStampSetting(TimestampOption.from("updatedTimestamp"))
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("The TIMESTAMP column name
updatedTimestamp requires spark option QUOTE_IDENTIFIERS set to true for
correct conversion");
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+ public void testMixedCaseTTLColumnNameWithQuoteIdentifiersSucceeds(String
cassandraVersion)
+ {
+ TableSchema schema = getValidSchemaBuilder(cassandraVersion)
+ .withTTLSetting(TTLOption.from("updatedTTL"))
+ .withQuotedIdentifiers()
+ .build();
+ assertThat(schema).isNotNull();
+ assertThat(trimUniqueTableName(schema.modificationStatement))
+ .contains("USING TTL :\"updatedTTL\"");
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
+ public void
testMixedCaseTimestampColumnNameWithQuoteIdentifiersSucceeds(String
cassandraVersion)
+ {
+ TableSchema schema = getValidSchemaBuilder(cassandraVersion)
+ .withTimeStampSetting(TimestampOption.from("updatedTimestamp"))
+ .withQuotedIdentifiers()
+ .build();
+ assertThat(schema).isNotNull();
+ assertThat(trimUniqueTableName(schema.modificationStatement))
+ .contains("USING TIMESTAMP :\"updatedTimestamp\"");
+ }
+
private TableSchemaTestCommon.MockTableSchemaBuilder
getValidSchemaBuilder(String cassandraVersion)
{
return new
TableSchemaTestCommon.MockTableSchemaBuilder(CassandraBridgeFactory.get(cassandraVersion))
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index 32fddffc..cff2e6f4 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -270,6 +270,16 @@ public final class TableSchemaTestCommon
return this;
}
+ private ImmutableMap<String, CqlField.CqlType>
addColumnToCqlColumns(ImmutableMap<String, CqlField.CqlType> currentColumns,
+
String columnName,
+
String cqlType)
+ {
+ ImmutableMap.Builder<String, CqlField.CqlType> builder =
ImmutableMap.builder();
+ builder.putAll(currentColumns);
+ builder.put(columnName, mockCqlType(cqlType));
+ return builder.build();
+ }
+
public TableSchema build()
{
Objects.requireNonNull(cqlColumns,
@@ -286,21 +296,26 @@ public final class TableSchemaTestCommon
"writeMode cannot be null. Please provide
the write mode by calling #withWriteMode");
Objects.requireNonNull(dataFrameSchema,
"dataFrameSchema cannot be null. Please
provide the write mode by calling #withDataFrameSchema");
- MockTableInfoProvider tableInfoProvider = new
MockTableInfoProvider(bridge,
-
cqlColumns,
-
partitionKeyColumns,
-
partitionKeyColumnTypes,
-
primaryKeyColumnNames,
-
cassandraVersion,
-
quoteIdentifiers);
+
+ ImmutableMap<String, CqlField.CqlType> updatedCqlColumns =
cqlColumns;
if (ttlOption.withTTl() && ttlOption.columnName() != null)
{
- dataFrameSchema = dataFrameSchema.add("ttl",
DataTypes.IntegerType);
+ dataFrameSchema = dataFrameSchema.add(ttlOption.columnName(),
DataTypes.IntegerType);
+ updatedCqlColumns = addColumnToCqlColumns(updatedCqlColumns,
ttlOption.columnName(), SqlToCqlTypeConverter.INT);
}
if (timestampOption.withTimestamp() &&
timestampOption.columnName() != null)
{
- dataFrameSchema = dataFrameSchema.add("timestamp",
DataTypes.IntegerType);
+ dataFrameSchema =
dataFrameSchema.add(timestampOption.columnName(), DataTypes.LongType);
+ updatedCqlColumns = addColumnToCqlColumns(updatedCqlColumns,
timestampOption.columnName(), SqlToCqlTypeConverter.BIGINT);
}
+
+ MockTableInfoProvider tableInfoProvider = new
MockTableInfoProvider(bridge,
+
updatedCqlColumns,
+
partitionKeyColumns,
+
partitionKeyColumnTypes,
+
primaryKeyColumnNames,
+
cassandraVersion,
+
quoteIdentifiers);
return new TableSchema(dataFrameSchema, tableInfoProvider,
writeMode, ttlOption, timestampOption, cassandraVersion, quoteIdentifiers);
}
}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
index d5d198eb..bed89e26 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
@@ -44,6 +44,7 @@ class BulkWriteTtlTest extends
SharedClusterSparkIntegrationTestBase
static final QualifiedName DEFAULT_TTL_NAME = new
QualifiedName(TEST_KEYSPACE, "test_default_ttl");
static final QualifiedName CONSTANT_TTL_NAME = new
QualifiedName(TEST_KEYSPACE, "test_ttl_constant");
static final QualifiedName PER_ROW_TTL_NAME = new
QualifiedName(TEST_KEYSPACE, "test_ttl_per_row");
+ static final QualifiedName MIXED_CASE_TTL_NAME = new
QualifiedName(TEST_KEYSPACE, "test_ttl_mixed_case");
@Test
void testTableDefaultTtl()
@@ -87,6 +88,21 @@ class BulkWriteTtlTest extends
SharedClusterSparkIntegrationTestBase
assertThat(result.hasNext()).isFalse();
}
+ @Test
+ void testTtlOptionPerRowWithMixedCaseColumnName()
+ {
+ SparkSession spark = getOrCreateSparkSession();
+ Dataset<Row> df = DataGenerationUtils.generateCourseData(spark, 1,
"updatedTTL", null, ROW_COUNT);
+
+ bulkWriterDataFrameWriter(df,
MIXED_CASE_TTL_NAME).option(WriterOptions.TTL.name(),
TTLOption.perRow("updatedTTL"))
+
.option(WriterOptions.QUOTE_IDENTIFIERS.name(), true)
+ .save();
+ // Wait to make sure TTLs have expired
+ Uninterruptibles.sleepUninterruptibly(1100, TimeUnit.MILLISECONDS);
+ SimpleQueryResult result =
cluster.coordinator(1).executeWithResult("SELECT * FROM " +
MIXED_CASE_TTL_NAME, ConsistencyLevel.ALL);
+ assertThat(result.hasNext()).isFalse();
+ }
+
@Override
protected ClusterBuilderConfiguration testClusterConfiguration()
{
@@ -117,5 +133,11 @@ class BulkWriteTtlTest extends
SharedClusterSparkIntegrationTestBase
+ " marks
BIGINT\n"
+ " );"
);
+ cluster.schemaChangeIgnoringStoppedInstances("CREATE TABLE " +
MIXED_CASE_TTL_NAME + " (\n"
+ + " id BIGINT
PRIMARY KEY,\n"
+ + " course
TEXT,\n"
+ + " marks
BIGINT\n"
+ + " );"
+ );
}
}
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java
index 0503c034..8c4623c9 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/DataGenerationUtils.java
@@ -153,6 +153,11 @@ public final class DataGenerationUtils
}
public static Dataset<Row> generateCourseData(SparkSession spark, Integer
ttl, Long timestamp, int rowCount)
+ {
+ return generateCourseData(spark, ttl, "ttl", timestamp, rowCount);
+ }
+
+ public static Dataset<Row> generateCourseData(SparkSession spark, Integer
ttl, String ttlColumnName, Long timestamp, int rowCount)
{
SQLContext sql = spark.sqlContext();
// Note: only primary key columns are required to be not nullable; All
columns are nullable only for test convenience
@@ -163,7 +168,7 @@ public final class DataGenerationUtils
if (ttl != null)
{
- schema = schema.add("ttl", IntegerType, false);
+ schema = schema.add(ttlColumnName, IntegerType, false);
}
if (timestamp != null)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]