This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7c3bbc  CASSANDRA-19031: Fix bulk writing when using identifiers that 
need quotes
c7c3bbc is described below

commit c7c3bbca2c7cb415b39689e924fa2357c239f043
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Tue Nov 14 16:28:14 2023 -0800

    CASSANDRA-19031: Fix bulk writing when using identifiers that need quotes
    
    Cassandra treats all identifiers as lower case unless explicitly quoted by 
the users,
    (i.e. keyspace names, table names, column names, etc). We can define a 
case-sensitive
    identifier or we can use a reserved word as an identifier by quoting it 
during DDL
    creation.
    
    In the analytics library, bulk writing fails when we encounter these 
identifiers. In
    this commit, we fix the issue by property propagating the information about 
whether
    identifiers need to be quoted by exposing a new dataframe option 
(`quote_identifiers`).
    When set to `true`, it will _maybe_ quote the keyspace/table/column names 
and it will
    properly be able to write data when using mixed-case or reserved words in 
the
    identifiers.
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19031
---
 .circleci/config.yml                               |   2 +
 CHANGES.txt                                        |   1 +
 .../cassandra/bridge/CassandraBridgeFactory.java   |  14 +-
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |   2 +
 .../bulkwriter/CassandraBulkWriterContext.java     |  38 ++++-
 .../spark/bulkwriter/CassandraClusterInfo.java     |  23 ++-
 .../spark/bulkwriter/CassandraJobInfo.java         |  17 +-
 .../apache/cassandra/spark/bulkwriter/JobInfo.java |  13 +-
 .../spark/bulkwriter/SidecarDataTransferApi.java   |  28 +++-
 .../cassandra/spark/bulkwriter/TTLOption.java      |   6 +-
 .../cassandra/spark/bulkwriter/TableSchema.java    |  59 ++++---
 .../spark/bulkwriter/TimestampOption.java          |   6 +-
 .../cassandra/spark/bulkwriter/WriterOptions.java  |   7 +-
 .../spark/bulkwriter/util/SbwKryoRegistrator.java  |   2 +-
 .../spark/bulkwriter/BulkSparkConfTest.java        |  11 ++
 .../spark/bulkwriter/MockBulkWriterContext.java    |  90 +++++++----
 .../spark/bulkwriter/RecordWriterTest.java         |  41 ++++-
 .../spark/bulkwriter/TableSchemaTest.java          |  25 ++-
 .../spark/bulkwriter/TableSchemaTestCommon.java    |  68 ++++++--
 .../sidecar/testing/IntegrationTestBase.java       |  46 +++---
 .../cassandra/analytics/BulkWriteTtlTest.java      |   6 +-
 ...iersTest.java => QuoteIdentifiersReadTest.java} |  51 ++----
 .../analytics/QuoteIdentifiersWriteTest.java       | 171 +++++++++++++++++++++
 .../analytics/SparkBulkWriterSimpleTest.java       |   2 +-
 .../analytics/SparkIntegrationTestBase.java        |  25 ++-
 25 files changed, 580 insertions(+), 174 deletions(-)

diff --git a/.circleci/config.yml b/.circleci/config.yml
index df4b720..bdf0a5a 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -58,6 +58,7 @@ jobs:
             SPARK_VERSION: "2"
             SCALA_VERSION: "2.11"
             JDK_VERSION: "1.8"
+            INTEGRATION_MAX_PARALLEL_FORKS: 2
 
       - store_artifacts:
           path: build/test-reports
@@ -85,6 +86,7 @@ jobs:
             SPARK_VERSION: "2"
             SCALA_VERSION: "2.12"
             JDK_VERSION: "1.8"
+            INTEGRATION_MAX_PARALLEL_FORKS: 2
 
       - store_artifacts:
           path: build/test-reports
diff --git a/CHANGES.txt b/CHANGES.txt
index 9aaa0d3..f580f89 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Fix bulk writing when using identifiers that need quotes (CASSANDRA-19031)
  * Fix bulk reading when using identifiers that need quotes (CASSANDRA-19024)
  * Remove unused dead code (CASSANDRA-19148)
  * Get Sidecar port through CassandraContext for more flexibility 
(CASSANDRA-19903)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
index 7e2cd51..1f69a4c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java
@@ -124,7 +124,6 @@ public final class CassandraBridgeFactory
             Class<CassandraBridge> bridge = (Class<CassandraBridge>) 
loader.loadClass("org.apache.cassandra.bridge.CassandraBridgeImplementation");
             Constructor<CassandraBridge> constructor = bridge.getConstructor();
             return constructor.newInstance();
-
         }
         catch (IOException | ClassNotFoundException | NoSuchMethodException | 
InstantiationException
              | IllegalAccessException | InvocationTargetException exception)
@@ -132,4 +131,17 @@ public final class CassandraBridgeFactory
             throw new RuntimeException("Failed to create Cassandra bridge for 
label " + label, exception);
         }
     }
+
+    /***
+     * Returns the quoted name when the {@code quoteIdentifiers} parameter is 
{@code true} <i>AND</i> the
+     * {@code unquotedName} needs to be quoted (i.e. it uses mixed case, or it 
is a Cassandra reserved word).
+     * @param bridge the Cassandra bridge
+     * @param quoteIdentifiers whether identifiers should be quoted
+     * @param unquotedName the unquoted name to maybe be quoted
+     * @return the quoted name when the conditions are met
+     */
+    public static String maybeQuotedIdentifier(CassandraBridge bridge, boolean 
quoteIdentifiers, String unquotedName)
+    {
+        return quoteIdentifiers ? bridge.maybeQuoteIdentifier(unquotedName) : 
unquotedName;
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index b98d4c5..7ea972e 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -126,6 +126,7 @@ public class BulkSparkConf implements Serializable
     protected final SparkConf conf;
     public final boolean validateSSTables;
     public final int commitThreadsPerInstance;
+    public boolean quoteIdentifiers;
     protected final int effectiveSidecarPort;
     protected final int userProvidedSidecarPort;
     protected boolean useOpenSsl;
@@ -166,6 +167,7 @@ public class BulkSparkConf implements Serializable
         this.ringRetryCount = getInt(RING_RETRY_COUNT, 
DEFAULT_RING_RETRY_COUNT);
         this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), 
null);
         this.timestamp = MapUtils.getOrDefault(options, 
WriterOptions.TIMESTAMP.name(), null);
+        this.quoteIdentifiers = MapUtils.getBoolean(options, 
WriterOptions.QUOTE_IDENTIFIERS.name(), false, "quote identifiers");
         validateEnvironment();
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index f093e13..439b1a6 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -63,6 +63,9 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
     {
         this.conf = conf;
         this.clusterInfo = clusterInfo;
+        String lowestCassandraVersion = 
clusterInfo.getLowestCassandraVersion();
+        CassandraBridge bridge = 
CassandraBridgeFactory.get(lowestCassandraVersion);
+
         CassandraRing<RingInstance> ring = clusterInfo.getRing(true);
         jobInfo = new CassandraJobInfo(conf,
                                        new TokenPartitioner(ring, 
conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores()));
@@ -71,20 +74,20 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
                                     String.format("Keyspace %s is not 
replicated on datacenter %s",
                                                   conf.keyspace, 
conf.localDC));
 
-        String keyspace = conf.keyspace;
-        String table = conf.table;
+        String keyspace = jobInfo.keyspace();
+        String table = jobInfo.tableName();
 
         String keyspaceSchema = clusterInfo.getKeyspaceSchema(true);
-        CassandraBridge bridge = 
CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion());
         Partitioner partitioner = clusterInfo.getPartitioner();
-        String tableSchema = CqlUtils.extractTableSchema(keyspaceSchema, 
keyspace, table);
+        String createTableSchema = CqlUtils.extractTableSchema(keyspaceSchema, 
keyspace, table);
         Set<String> udts = CqlUtils.extractUdts(keyspaceSchema, keyspace);
         ReplicationFactor replicationFactor = 
CqlUtils.extractReplicationFactor(keyspaceSchema, keyspace);
         int indexCount = CqlUtils.extractIndexCount(keyspaceSchema, keyspace, 
table);
-        CqlTable cqlTable = bridge.buildSchema(tableSchema, keyspace, 
replicationFactor, partitioner, udts, null, indexCount);
+        CqlTable cqlTable = bridge.buildSchema(createTableSchema, keyspace, 
replicationFactor, partitioner, udts, null, indexCount);
 
-        TableInfoProvider tableInfoProvider = new 
CqlTableInfoProvider(tableSchema, cqlTable);
-        schemaInfo = new CassandraSchemaInfo(new TableSchema(dfSchema, 
tableInfoProvider, conf.writeMode, conf.getTTLOptions(), 
conf.getTimestampOptions()));
+        TableInfoProvider tableInfoProvider = new 
CqlTableInfoProvider(createTableSchema, cqlTable);
+        TableSchema tableSchema = initializeTableSchema(conf, dfSchema, 
tableInfoProvider, lowestCassandraVersion);
+        schemaInfo = new CassandraSchemaInfo(tableSchema);
     }
 
     public static BulkWriterContext fromOptions(@NotNull SparkContext 
sparkContext,
@@ -180,8 +183,27 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
     {
         if (dataTransferApi == null)
         {
-            dataTransferApi = new 
SidecarDataTransferApi(clusterInfo.getCassandraContext(), jobInfo, conf);
+            CassandraBridge bridge = 
CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion());
+            dataTransferApi = new 
SidecarDataTransferApi(clusterInfo.getCassandraContext(),
+                                                         bridge,
+                                                         jobInfo,
+                                                         conf);
         }
         return dataTransferApi;
     }
+
+    @NotNull
+    protected TableSchema initializeTableSchema(@NotNull BulkSparkConf conf,
+                                                @NotNull StructType dfSchema,
+                                                TableInfoProvider 
tableInfoProvider,
+                                                String lowestCassandraVersion)
+    {
+        return new TableSchema(dfSchema,
+                               tableInfoProvider,
+                               conf.writeMode,
+                               conf.getTTLOptions(),
+                               conf.getTimestampOptions(),
+                               lowestCassandraVersion,
+                               conf.quoteIdentifiers);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index b490390..19c7137 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -34,6 +34,8 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersionFeatures;
 import org.apache.cassandra.clients.Sidecar;
 import org.apache.cassandra.sidecar.client.SidecarInstance;
@@ -53,6 +55,8 @@ import org.apache.cassandra.spark.utils.CqlUtils;
 import org.apache.cassandra.spark.utils.FutureUtils;
 import org.jetbrains.annotations.NotNull;
 
+import static 
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier;
+
 public class CassandraClusterInfo implements ClusterInfo, Closeable
 {
     private static final long serialVersionUID = -6944818863462956767L;
@@ -233,7 +237,9 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
 
     protected String getCurrentKeyspaceSchema() throws Exception
     {
-        SchemaResponse schemaResponse = 
getCassandraContext().getSidecarClient().schema(conf.keyspace).get();
+        SchemaResponse schemaResponse = 
getCassandraContext().getSidecarClient()
+                                                             
.schema(maybeQuotedIdentifier(bridge(), conf.quoteIdentifiers, conf.keyspace))
+                                                             .get();
         return schemaResponse.schema();
     }
 
@@ -243,7 +249,11 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
         RingResponse ringResponse = getCurrentRingResponse();
         List<RingInstance> instances = getSerializableInstances(ringResponse);
         ReplicationFactor replicationFactor = getReplicationFactor();
-        return new CassandraRing<>(getPartitioner(), conf.keyspace, 
replicationFactor, instances);
+
+        return new CassandraRing<>(getPartitioner(),
+                                   maybeQuotedIdentifier(bridge(), 
conf.quoteIdentifiers, conf.keyspace),
+                                   replicationFactor,
+                                   instances);
     }
 
     @NotNull
@@ -403,7 +413,9 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
 
     private RingResponse getCurrentRingResponse() throws Exception
     {
-        return 
getCassandraContext().getSidecarClient().ring(conf.keyspace).get();
+        return getCassandraContext().getSidecarClient()
+                                    .ring(maybeQuotedIdentifier(bridge(), 
conf.quoteIdentifiers, conf.keyspace))
+                                    .get();
     }
 
     private static List<RingInstance> getSerializableInstances(RingResponse 
ringResponse)
@@ -535,6 +547,11 @@ public class CassandraClusterInfo implements ClusterInfo, 
Closeable
         return false;
     }
 
+    protected CassandraBridge bridge()
+    {
+        return CassandraBridgeFactory.get(getLowestCassandraVersion());
+    }
+
     // Startup Validation
 
     @Override
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index 5840c8a..73104fa 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -113,9 +113,20 @@ public class CassandraJobInfo implements JobInfo
     }
 
     @Override
-    @NotNull
-    public String getFullTableName()
+    public boolean quoteIdentifiers()
+    {
+        return conf.quoteIdentifiers;
+    }
+
+    @Override
+    public String keyspace()
+    {
+        return conf.keyspace;
+    }
+
+    @Override
+    public String tableName()
     {
-        return conf.keyspace + "." + conf.table;
+        return conf.table;
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index f7ac149..15185b3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -52,6 +52,17 @@ public interface JobInfo extends Serializable
 
     boolean skipExtendedVerify();
 
-    String getFullTableName();
+    boolean quoteIdentifiers();
+
+    String keyspace();
+
+    String tableName();
+
+    @NotNull
+    default String getFullTableName()
+    {
+        return keyspace() + "." + tableName();
+    }
+
     boolean getSkipClean();
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java
index c3cad7f..8ce5619 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.sidecar.client.SidecarClient;
 import org.apache.cassandra.sidecar.client.SidecarInstanceImpl;
 import org.apache.cassandra.sidecar.client.request.ImportSSTableRequest;
@@ -35,6 +36,8 @@ import org.apache.cassandra.spark.common.MD5Hash;
 import org.apache.cassandra.spark.common.client.ClientException;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 
+import static 
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier;
+
 /**
  * A {@link DataTransferApi} implementation that interacts with Cassandra 
Sidecar
  */
@@ -46,14 +49,16 @@ public class SidecarDataTransferApi implements 
DataTransferApi
     private static final int SSTABLE_GENERATION_REVERSE_OFFSET = 3;
 
     private final transient SidecarClient sidecarClient;
+    private final CassandraBridge bridge;
     private final int sidecarPort;
     private final JobInfo job;
     private final BulkSparkConf conf;
 
-    public SidecarDataTransferApi(CassandraContext cassandraContext, JobInfo 
job, BulkSparkConf conf)
+    public SidecarDataTransferApi(CassandraContext cassandraContext, 
CassandraBridge bridge, JobInfo job, BulkSparkConf conf)
     {
         this.sidecarClient = cassandraContext.getSidecarClient();
         this.sidecarPort = cassandraContext.sidecarPort();
+        this.bridge = bridge;
         this.job = job;
         this.conf = conf;
     }
@@ -69,17 +74,22 @@ public class SidecarDataTransferApi implements 
DataTransferApi
         String uploadId = getUploadId(sessionID, job.getId().toString());
         try
         {
-            sidecarClient.uploadSSTableRequest(toSidecarInstance(instance), 
conf.keyspace, conf.table, uploadId,
-                                               componentName, 
fileHash.toString(),
-                                               
componentFile.toAbsolutePath().toString()).get();
+            sidecarClient.uploadSSTableRequest(toSidecarInstance(instance),
+                                               maybeQuotedIdentifier(bridge, 
conf.quoteIdentifiers, conf.keyspace),
+                                               maybeQuotedIdentifier(bridge, 
conf.quoteIdentifiers, conf.table),
+                                               uploadId,
+                                               componentName,
+                                               fileHash.toString(),
+                                               
componentFile.toAbsolutePath().toString())
+                         .get();
         }
         catch (ExecutionException | InterruptedException exception)
         {
             LOGGER.warn("Failed to upload file={}, keyspace={}, table={}, 
uploadId={}, componentName={}, instance={}",
                         componentFile, conf.keyspace, conf.table, uploadId, 
componentName, instance);
             throw new ClientException(
-                    String.format("Failed to upload file=%s into keyspace=%s, 
table=%s, componentName=%s with uploadId=%s to instance=%s",
-                                  componentFile, conf.keyspace, conf.table, 
componentName, uploadId, instance), exception);
+            String.format("Failed to upload file=%s into keyspace=%s, 
table=%s, componentName=%s with uploadId=%s to instance=%s",
+                          componentFile, conf.keyspace, conf.table, 
componentName, uploadId, instance), exception);
         }
     }
 
@@ -104,7 +114,11 @@ public class SidecarDataTransferApi implements 
DataTransferApi
         try
         {
             SSTableImportResponse response =
-            sidecarClient.importSSTableRequest(toSidecarInstance(instance), 
conf.keyspace, conf.table, uploadId, importOptions).get();
+            sidecarClient.importSSTableRequest(toSidecarInstance(instance),
+                                               maybeQuotedIdentifier(bridge, 
conf.quoteIdentifiers, conf.keyspace),
+                                               maybeQuotedIdentifier(bridge, 
conf.quoteIdentifiers, conf.table),
+                                               uploadId,
+                                               importOptions).get();
             if (response.success())
             {
                 return new RemoteCommitResult(response.success(), 
Collections.emptyList(), Collections.singletonList(uploadId), null);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
index 21a07be..af9c195 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.function.Function;
 
 public final class TTLOption implements Serializable
 {
@@ -111,12 +112,11 @@ public final class TTLOption implements Serializable
                 && (ttlColumnName != null || ttlInSeconds != null);
     }
 
-    @Override
-    public String toString()
+    public String toCQLString(Function<String, String> maybeQuoteFunction)
     {
         if (ttlColumnName != null && !ttlColumnName.isEmpty())
         {
-            return ":" + ttlColumnName;
+            return ":" + maybeQuoteFunction.apply(ttlColumnName);
         }
         if (ttlInSeconds != null)
         {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index fa3371f..99a6a8d 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -29,13 +29,17 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.base.Preconditions;
-import org.apache.cassandra.spark.data.CqlField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.spark.common.schema.ColumnType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.spark.sql.types.StructType;
 
+import static 
org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier;
+
 public class TableSchema implements Serializable
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TableSchema.class);
@@ -49,13 +53,22 @@ public class TableSchema implements Serializable
     private final WriteMode writeMode;
     private final TTLOption ttlOption;
     private final TimestampOption timestampOption;
+    private final String lowestCassandraVersion;
+    private final boolean quoteIdentifiers;
 
-    public TableSchema(StructType dfSchema, TableInfoProvider tableInfo, 
WriteMode writeMode,
-                       TTLOption ttlOption, TimestampOption timestampOption)
+    public TableSchema(StructType dfSchema,
+                       TableInfoProvider tableInfo,
+                       WriteMode writeMode,
+                       TTLOption ttlOption,
+                       TimestampOption timestampOption,
+                       String lowestCassandraVersion,
+                       boolean quoteIdentifiers)
     {
         this.writeMode = writeMode;
         this.ttlOption = ttlOption;
         this.timestampOption = timestampOption;
+        this.lowestCassandraVersion = lowestCassandraVersion;
+        this.quoteIdentifiers = quoteIdentifiers;
 
         validateDataFrameCompatibility(dfSchema, tableInfo);
         validateNoSecondaryIndexes(tableInfo);
@@ -123,7 +136,7 @@ public class TableSchema implements Serializable
                          CqlField.CqlType cqlType = 
tableInfo.getColumnType(fieldName);
                          return SqlToCqlTypeConverter.getConverter(cqlType);
                      })
-                    .collect(Collectors.toList());
+                     .collect(Collectors.toList());
     }
 
     private static List<ColumnType<?>> 
getPartitionKeyColumnTypes(TableInfoProvider tableInfo)
@@ -156,35 +169,44 @@ public class TableSchema implements Serializable
         }
     }
 
-    private static String getInsertStatement(StructType dfSchema, 
TableInfoProvider tableInfo,
-                                             TTLOption ttlOption, 
TimestampOption timestampOption)
+    private String getInsertStatement(StructType dfSchema,
+                                      TableInfoProvider tableInfo,
+                                      TTLOption ttlOption,
+                                      TimestampOption timestampOption)
     {
+        CassandraBridge bridge = 
CassandraBridgeFactory.get(lowestCassandraVersion);
         List<String> columnNames = Arrays.stream(dfSchema.fieldNames())
                                          .filter(fieldName -> 
!fieldName.equals(ttlOption.columnName()))
                                          .filter(fieldName -> 
!fieldName.equals(timestampOption.columnName()))
                                          .collect(Collectors.toList());
         StringBuilder stringBuilder = new StringBuilder("INSERT INTO ")
-                .append(tableInfo.getKeyspaceName())
-                .append(".").append(tableInfo.getName())
-                .append(columnNames.stream().collect(Collectors.joining(",", " 
(", ") ")))
-                .append("VALUES")
-                .append(columnNames.stream().map(columnName -> ":" + 
columnName).collect(Collectors.joining(",", " (", ")")));
+                                      .append(maybeQuotedIdentifier(bridge, 
quoteIdentifiers, tableInfo.getKeyspaceName()))
+                                      .append(".")
+                                      .append(maybeQuotedIdentifier(bridge, 
quoteIdentifiers, tableInfo.getName()))
+                                      .append(columnNames.stream()
+                                                         .map(columnName -> 
maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName))
+                                                         
.collect(Collectors.joining(",", " (", ") ")));
+
+        stringBuilder.append("VALUES")
+                     .append(columnNames.stream()
+                                        .map(columnName -> ":" + 
maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName))
+                                        .collect(Collectors.joining(",", " (", 
")")));
         if (ttlOption.withTTl() && timestampOption.withTimestamp())
         {
             stringBuilder.append(" USING TIMESTAMP ")
-                         .append(timestampOption)
+                         .append(timestampOption.toCQLString(columnName -> 
maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName)))
                          .append(" AND TTL ")
-                         .append(ttlOption);
+                         .append(ttlOption.toCQLString(columnName -> 
maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName)));
         }
         else if (timestampOption.withTimestamp())
         {
             stringBuilder.append(" USING TIMESTAMP ")
-                         .append(timestampOption);
+                         .append(timestampOption.toCQLString(columnName -> 
maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName)));
         }
         else if (ttlOption.withTTl())
         {
             stringBuilder.append(" USING TTL ")
-                    .append(ttlOption);
+                         .append(ttlOption.toCQLString(columnName -> 
maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName)));
         }
         stringBuilder.append(";");
         String insertStatement = stringBuilder.toString();
@@ -195,10 +217,11 @@ public class TableSchema implements Serializable
 
     private String getDeleteStatement(StructType dfSchema, TableInfoProvider 
tableInfo)
     {
-        Stream<String> fieldEqualityStatements = 
Arrays.stream(dfSchema.fieldNames()).map(key -> key + "=?");
+        CassandraBridge bridge = 
CassandraBridgeFactory.get(lowestCassandraVersion);
+        Stream<String> fieldEqualityStatements = 
Arrays.stream(dfSchema.fieldNames()).map(key -> maybeQuotedIdentifier(bridge, 
quoteIdentifiers, key) + "=?");
         String deleteStatement = String.format("DELETE FROM %s.%s where %s;",
-                                               tableInfo.getKeyspaceName(),
-                                               tableInfo.getName(),
+                                               maybeQuotedIdentifier(bridge, 
quoteIdentifiers, tableInfo.getKeyspaceName()),
+                                               maybeQuotedIdentifier(bridge, 
quoteIdentifiers, tableInfo.getName()),
                                                
fieldEqualityStatements.collect(Collectors.joining(" AND ")));
 
         LOGGER.info("CQL delete statement for the RDD {}", deleteStatement);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
index 06307b9..c63dbe7 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
@@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter;
 import java.io.Serializable;
 import java.time.Duration;
 import java.time.temporal.ChronoUnit;
+import java.util.function.Function;
 
 public final class TimestampOption implements Serializable
 {
@@ -109,12 +110,11 @@ public final class TimestampOption implements Serializable
                 && (timestampColumnName != null || timeStampInMicroSeconds != 
null);
     }
 
-    @Override
-    public String toString()
+    public String toCQLString(Function<String, String> maybeQuoteFunction)
     {
         if (timestampColumnName != null && !timestampColumnName.isEmpty())
         {
-            return ":" + timestampColumnName;
+            return ":" + maybeQuoteFunction.apply(timestampColumnName);
         }
         if (timeStampInMicroSeconds != null)
         {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index d824f05..95c7876 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -45,5 +45,10 @@ public enum WriterOptions implements WriterOption
     ROW_BUFFER_MODE,
     SSTABLE_DATA_SIZE_IN_MB,
     TTL,
-    TIMESTAMP
+    TIMESTAMP,
+    /**
+     * Option that specifies whether the identifiers (i.e. keyspace, table 
name, column names) should be quoted to
+     * support mixed case and reserved keyword names for these fields.
+     */
+    QUOTE_IDENTIFIERS,
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
index baf66ac..1c5b8df 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java
@@ -53,7 +53,7 @@ public class SbwKryoRegistrator implements KryoRegistrator
     {
         LOGGER.debug("Registering Spark Bulk Writer classes with Kryo which 
require use of Java Serializer");
         // NOTE: The order of calls to `register` matters, so we sort by class 
name just to make sure we always
-        //       register classess in the same order - HashSet doesn't 
guarantee its iteration order
+        //       register classes in the same order - HashSet doesn't 
guarantee its iteration order
         javaSerializableClasses.stream()
                 .sorted(Comparator.comparing(Class::getCanonicalName))
                 .forEach(javaSerializableClass -> 
kryo.register(javaSerializableClass, new SbwJavaSerializer()));
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index 2c22f3e..bc46a21 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -268,6 +268,17 @@ public class BulkSparkConfTest
         assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
     }
 
+    @Test
+    void testQuoteIdentifiers()
+    {
+        assertFalse(bulkSparkConf.quoteIdentifiers);
+        Map<String, String> options = copyDefaultOptions();
+        options.put(WriterOptions.QUOTE_IDENTIFIERS.name(), "true");
+        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
+        assertNotNull(bulkSparkConf);
+        assertTrue(bulkSparkConf.quoteIdentifiers);
+    }
+
     private Map<String, String> copyDefaultOptions()
     {
         TreeMap<String, String> map = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 19ba2d6..8aee396 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
@@ -63,6 +64,14 @@ import static 
org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCq
 public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, 
JobInfo, SchemaInfo, DataTransferApi
 {
     private static final long serialVersionUID = -2912371629236770646L;
+    public static final String[] DEFAULT_PARTITION_KEY_COLUMNS = {"id", 
"date"};
+    public static final String[] DEFAULT_PRIMARY_KEY_COLUMN_NAMES = {"id", 
"date"};
+    public static final Pair<StructType, ImmutableMap<String, 
CqlField.CqlType>> DEFAULT_VALID_PAIR =
+    TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns(
+    new String[]{"id", "date", "course", "marks"},
+    new org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType, 
DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType},
+    new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), 
mockCqlType(VARCHAR), mockCqlType(INT)});
+    private final boolean quoteIdentifiers;
     private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED;
     private ConsistencyLevel.CL consistencyLevel;
     private int sstableDataSizeInMB = 128;
@@ -90,35 +99,57 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
     private CommitResultSupplier crSupplier = (uuids, dc) -> new 
RemoteCommitResult(true, Collections.emptyList(), uuids, null);
 
     private Predicate<CassandraInstance> uploadRequestConsumer = instance -> 
true;
-    private TTLOption ttlOption = TTLOption.forever();
+
+    public MockBulkWriterContext(CassandraRing<RingInstance> ring)
+    {
+        this(ring,
+             DEFAULT_CASSANDRA_VERSION,
+             ConsistencyLevel.CL.LOCAL_QUORUM,
+             DEFAULT_VALID_PAIR,
+             DEFAULT_PARTITION_KEY_COLUMNS,
+             DEFAULT_PRIMARY_KEY_COLUMN_NAMES,
+             false);
+    }
 
     public MockBulkWriterContext(CassandraRing<RingInstance> ring,
                                  String cassandraVersion,
                                  ConsistencyLevel.CL consistencyLevel)
     {
+        this(ring, cassandraVersion, consistencyLevel, DEFAULT_VALID_PAIR, 
DEFAULT_PARTITION_KEY_COLUMNS, DEFAULT_PRIMARY_KEY_COLUMN_NAMES, false);
+    }
+
+    public MockBulkWriterContext(CassandraRing<RingInstance> ring,
+                                 String cassandraVersion,
+                                 ConsistencyLevel.CL consistencyLevel,
+                                 Pair<StructType, ImmutableMap<String, 
CqlField.CqlType>> validPair,
+                                 String[] partitionKeyColumns,
+                                 String[] primaryKeyColumnNames,
+                                 boolean quoteIdentifiers)
+    {
+        this.quoteIdentifiers = quoteIdentifiers;
         this.ring = ring;
         this.tokenPartitioner = new TokenPartitioner(ring, 1, 2, 2, false);
         this.cassandraVersion = cassandraVersion;
         this.consistencyLevel = consistencyLevel;
-        validPair = TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns(
-        new String[]{"id", "date", "course", "marks"},
-        new org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType, 
DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType},
-        new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), 
mockCqlType(VARCHAR), mockCqlType(INT)});
-        StructType validDataFrameSchema = validPair.getKey();
-        ImmutableMap<String, CqlField.CqlType> validCqlColumns = 
validPair.getValue();
-        String[] partitionKeyColumns = {"id", "date"};
-        String[] primaryKeyColumnNames = {"id", "date"};
+        this.validPair = validPair;
+        StructType validDataFrameSchema = this.validPair.getKey();
+        ImmutableMap<String, CqlField.CqlType> validCqlColumns = 
this.validPair.getValue();
         ColumnType<?>[] partitionKeyColumnTypes = {ColumnTypes.INT, 
ColumnTypes.INT};
-        this.schema = new TableSchemaTestCommon.MockTableSchemaBuilder()
-                      .withCqlColumns(validCqlColumns)
-                      .withPartitionKeyColumns(partitionKeyColumns)
-                      .withPrimaryKeyColumnNames(primaryKeyColumnNames)
-                      .withCassandraVersion(cassandraVersion)
-                      .withPartitionKeyColumnTypes(partitionKeyColumnTypes)
-                      .withWriteMode(WriteMode.INSERT)
-                      .withDataFrameSchema(validDataFrameSchema)
-                      .withTTLSetting(ttlOption)
-                      .build();
+        TTLOption ttlOption = TTLOption.forever();
+        TableSchemaTestCommon.MockTableSchemaBuilder builder = new 
TableSchemaTestCommon.MockTableSchemaBuilder(CassandraBridgeFactory.get(cassandraVersion))
+                                                               
.withCqlColumns(validCqlColumns)
+                                                               
.withPartitionKeyColumns(partitionKeyColumns)
+                                                               
.withPrimaryKeyColumnNames(primaryKeyColumnNames)
+                                                               
.withCassandraVersion(cassandraVersion)
+                                                               
.withPartitionKeyColumnTypes(partitionKeyColumnTypes)
+                                                               
.withWriteMode(WriteMode.INSERT)
+                                                               
.withDataFrameSchema(validDataFrameSchema)
+                                                               
.withTTLSetting(ttlOption);
+        if (quoteIdentifiers())
+        {
+            builder.withQuotedIdentifiers();
+        }
+        this.schema = builder.build();
         this.jobId = java.util.UUID.randomUUID();
     }
 
@@ -132,11 +163,6 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         this.timeProvider = timeProvider;
     }
 
-    public MockBulkWriterContext(CassandraRing<RingInstance> ring)
-    {
-        this(ring, DEFAULT_CASSANDRA_VERSION, 
ConsistencyLevel.CL.LOCAL_QUORUM);
-    }
-
     @Override
     public void shutdown()
     {
@@ -412,9 +438,21 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
     }
 
     @Override
-    public String getFullTableName()
+    public boolean quoteIdentifiers()
+    {
+        return quoteIdentifiers;
+    }
+
+    @Override
+    public String keyspace()
+    {
+        return "keyspace";
+    }
+
+    @Override
+    public String tableName()
     {
-        return "keyspace.table";
+        return "table";
     }
 
     // Startup Validation
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index 940f5a2..90d14fc 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -29,7 +29,9 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Range;
+import org.apache.commons.lang3.tuple.Pair;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -37,9 +39,19 @@ import org.junit.jupiter.api.io.TempDir;
 
 import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
 import scala.Tuple2;
 
+import static 
org.apache.cassandra.spark.bulkwriter.MockBulkWriterContext.DEFAULT_CASSANDRA_VERSION;
+import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
+import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.VARCHAR;
+import static 
org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCqlType;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.matchesPattern;
@@ -84,21 +96,42 @@ public class RecordWriterTest
         validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
     }
 
+    @Test
+    public void testWriteWithMixedCaseColumnNames()
+    {
+        boolean quoteIdentifiers = true;
+        String[] pk = {"ID", "date"};
+        String[] columnNames = {"ID", "date", "course", "limit"};
+
+        Pair<StructType, ImmutableMap<String, CqlField.CqlType>> validPair = 
TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns(
+        columnNames,
+        new DataType[]{DataTypes.IntegerType, DataTypes.DateType, 
DataTypes.StringType, DataTypes.IntegerType},
+        new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), 
mockCqlType(VARCHAR), mockCqlType(INT)});
+
+        MockBulkWriterContext writerContext = new MockBulkWriterContext(ring,
+                                                                        
DEFAULT_CASSANDRA_VERSION,
+                                                                        
ConsistencyLevel.CL.LOCAL_QUORUM,
+                                                                        
validPair,
+                                                                        pk,
+                                                                        pk,
+                                                                        
quoteIdentifiers);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
+        validateSuccessfulWrite(writerContext, data, columnNames);
+    }
+
     @Test
     public void testWriteWithConstantTTL()
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
-        validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
+        validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
     }
 
     @Test
     public void testWriteWithTTLColumn()
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(ring);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, false);
         String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
-        validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
+        validateSuccessfulWrite(writerContext, data, columnNamesWithTtl);
     }
 
     @Test
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index 1c00d83..5cb184f 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.jupiter.api.Test;
 
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.spark.common.schema.ColumnType;
 import org.apache.cassandra.spark.common.schema.ColumnTypes;
 import org.apache.cassandra.spark.data.CqlField;
@@ -67,7 +68,7 @@ public class TableSchemaTest
     {
         TableSchema schema = getValidSchemaBuilder()
                 .build();
-        assertThat(schema.modificationStatement,
+        assertThat(trimUniqueTableName(schema.modificationStatement),
                    is(equalTo("INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks);")));
     }
 
@@ -75,14 +76,16 @@ public class TableSchemaTest
     public void testInsertStatementWithConstantTTL()
     {
         TableSchema schema = 
getValidSchemaBuilder().withTTLSetting(TTLOption.from("1000")).build();
-        assertThat(schema.modificationStatement, is(equalTo("INSERT INTO 
test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL 
1000;")));
+        assertThat(trimUniqueTableName(schema.modificationStatement),
+                   is(equalTo("INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks) USING TTL 1000;")));
     }
 
     @Test
     public void testInsertStatementWithTTLColumn()
     {
         TableSchema schema = 
getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).build();
-        assertThat(schema.modificationStatement, is(equalTo("INSERT INTO 
test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL 
:ttl;")));
+        assertThat(trimUniqueTableName(schema.modificationStatement),
+                   is(equalTo("INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks) USING TTL :ttl;")));
     }
 
     @Test
@@ -90,7 +93,7 @@ public class TableSchemaTest
     {
         TableSchema schema = 
getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("1000")).build();
         String expectedQuery = "INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks) USING TIMESTAMP 1000;";
-        assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
+        assertThat(trimUniqueTableName(schema.modificationStatement), 
is(equalTo(expectedQuery)));
     }
 
     @Test
@@ -98,14 +101,14 @@ public class TableSchemaTest
     {
         TableSchema schema = 
getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("timestamp")).build();
         String expectedQuery = "INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp;";
-        assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
+        assertThat(trimUniqueTableName(schema.modificationStatement), 
is(equalTo(expectedQuery)));
     }
     @Test
     public void testInsertStatementWithTTLAndTimestampColumn()
     {
         TableSchema schema = 
getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).withTimeStampSetting(TimestampOption.from("timestamp")).build();
         String expectedQuery = "INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp AND TTL :ttl;";
-        assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
+        assertThat(trimUniqueTableName(schema.modificationStatement), 
is(equalTo(expectedQuery)));
     }
 
     @Test
@@ -120,7 +123,7 @@ public class TableSchemaTest
         TableSchema schema = getValidSchemaBuilder()
                 .withWriteMode(WriteMode.DELETE_PARTITION)
                 .build();
-        assertThat(schema.modificationStatement, is(equalTo("DELETE FROM 
test.test where id=?;")));
+        assertThat(trimUniqueTableName(schema.modificationStatement), 
is(equalTo("DELETE FROM test.test where id=?;")));
     }
 
     @Test
@@ -222,7 +225,7 @@ public class TableSchemaTest
 
     private TableSchemaTestCommon.MockTableSchemaBuilder 
getValidSchemaBuilder()
     {
-        return new TableSchemaTestCommon.MockTableSchemaBuilder()
+        return new 
TableSchemaTestCommon.MockTableSchemaBuilder(CassandraBridgeFactory.get(cassandraVersion))
                 .withCqlColumns(validCqlColumns)
                 .withPartitionKeyColumns(partitionKeyColumns)
                 .withPrimaryKeyColumnNames(primaryKeyColumnNames)
@@ -231,4 +234,10 @@ public class TableSchemaTest
                 .withWriteMode(WriteMode.INSERT)
                 .withDataFrameSchema(validDataFrameSchema);
     }
+
+    // Removes the unique table name to make validation consistent
+    private static String trimUniqueTableName(String statement)
+    {
+        return statement.replaceAll("test.test_table_\\d+", "test.test");
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index 18ce2a6..ef937ce 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
@@ -30,6 +31,8 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.spark.common.schema.ColumnType;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.spark.sql.types.DataTypes;
@@ -131,12 +134,13 @@ public final class TableSchemaTestCommon
         Pair<StructType, ImmutableMap<String, CqlField.CqlType>> pair = 
buildMatchedDataframeAndCqlColumns(fieldNames, sparkTypes, driverTypes);
         ImmutableMap<String, CqlField.CqlType> cqlColumns = pair.getValue();
         StructType dataFrameSchema = pair.getKey();
+        String cassandraVersion = "4.0.0";
         return
-            new MockTableSchemaBuilder()
+            new 
MockTableSchemaBuilder(CassandraBridgeFactory.get(cassandraVersion))
                 .withCqlColumns(cqlColumns)
                 .withPartitionKeyColumns(partitionKeyColumns)
                 .withPrimaryKeyColumnNames(primaryKeyColumnNames)
-                .withCassandraVersion("3.0.24.8")
+                .withCassandraVersion(cassandraVersion)
                 .withPartitionKeyColumnTypes(partitionKeyColumnTypes)
                 .withWriteMode(WriteMode.INSERT)
                 .withDataFrameSchema(dataFrameSchema)
@@ -145,6 +149,7 @@ public final class TableSchemaTestCommon
 
     public static class MockTableSchemaBuilder
     {
+        private final CassandraBridge bridge;
         private ImmutableMap<String, CqlField.CqlType> cqlColumns;
         private String[] partitionKeyColumns;
         private String[] primaryKeyColumnNames;
@@ -154,11 +159,17 @@ public final class TableSchemaTestCommon
         private WriteMode writeMode = null;
         private TTLOption ttlOption = TTLOption.forever();
         private TimestampOption timestampOption = TimestampOption.now();
+        private boolean quoteIdentifiers = false;
+
+        public MockTableSchemaBuilder(CassandraBridge bridge)
+        {
+            this.bridge = bridge;
+        }
 
         public MockTableSchemaBuilder withCqlColumns(@NotNull Map<String, 
CqlField.CqlType> cqlColumns)
         {
             Preconditions.checkNotNull(cqlColumns, "cqlColumns cannot be 
null");
-            Preconditions.checkArgument(cqlColumns.size() > 0, "cqlColumns 
cannot be empty");
+            Preconditions.checkArgument(!cqlColumns.isEmpty(), "cqlColumns 
cannot be empty");
             this.cqlColumns = ImmutableMap.copyOf(cqlColumns);
             return this;
         }
@@ -182,7 +193,7 @@ public final class TableSchemaTestCommon
         public MockTableSchemaBuilder withCassandraVersion(@NotNull String 
cassandraVersion)
         {
             Preconditions.checkNotNull(cassandraVersion, "cassandraVersion 
cannot be null");
-            Preconditions.checkArgument(cassandraVersion.length() > 0, 
"cassandraVersion cannot be an empty string");
+            Preconditions.checkArgument(!cassandraVersion.isEmpty(), 
"cassandraVersion cannot be an empty string");
             this.cassandraVersion = cassandraVersion;
             return this;
         }
@@ -222,6 +233,12 @@ public final class TableSchemaTestCommon
             return this;
         }
 
+        public MockTableSchemaBuilder withQuotedIdentifiers()
+        {
+            this.quoteIdentifiers = true;
+            return this;
+        }
+
         public TableSchema build()
         {
             Objects.requireNonNull(cqlColumns,
@@ -238,11 +255,13 @@ public final class TableSchemaTestCommon
                                    "writeMode cannot be null. Please provide 
the write mode by calling #withWriteMode");
             Objects.requireNonNull(dataFrameSchema,
                                    "dataFrameSchema cannot be null. Please 
provide the write mode by calling #withDataFrameSchema");
-            MockTableInfoProvider tableInfoProvider = new 
MockTableInfoProvider(cqlColumns,
+            MockTableInfoProvider tableInfoProvider = new 
MockTableInfoProvider(bridge,
+                                                                               
 cqlColumns,
                                                                                
 partitionKeyColumns,
                                                                                
 partitionKeyColumnTypes,
                                                                                
 primaryKeyColumnNames,
-                                                                               
 cassandraVersion);
+                                                                               
 cassandraVersion,
+                                                                               
 quoteIdentifiers);
             if (ttlOption.withTTl() && ttlOption.columnName() != null)
             {
                 dataFrameSchema = dataFrameSchema.add("ttl", 
DataTypes.IntegerType);
@@ -251,31 +270,41 @@ public final class TableSchemaTestCommon
             {
                 dataFrameSchema = dataFrameSchema.add("timestamp", 
DataTypes.IntegerType);
             }
-            return new TableSchema(dataFrameSchema, tableInfoProvider, 
writeMode, ttlOption, timestampOption);
+            return new TableSchema(dataFrameSchema, tableInfoProvider, 
writeMode, ttlOption, timestampOption, cassandraVersion, quoteIdentifiers);
         }
     }
 
     public static class MockTableInfoProvider implements TableInfoProvider
     {
+        public static final String TEST_TABLE_PREFIX = "test_table_";
+        public static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0);
+        private final CassandraBridge bridge;
         private final ImmutableMap<String, CqlField.CqlType> cqlColumns;
         private final String[] partitionKeyColumns;
         private final ColumnType[] partitionKeyColumnTypes;
         private final String[] primaryKeyColumnNames;
+        private final String uniqueTableName;
         Map<String, CqlField.CqlType> columns;
         private final String cassandraVersion;
+        private final boolean quoteIdentifiers;
 
-        public MockTableInfoProvider(ImmutableMap<String, CqlField.CqlType> 
cqlColumns,
+        public MockTableInfoProvider(CassandraBridge bridge,
+                                     ImmutableMap<String, CqlField.CqlType> 
cqlColumns,
                                      String[] partitionKeyColumns,
                                      ColumnType[] partitionKeyColumnTypes,
                                      String[] primaryKeyColumnNames,
-                                     String cassandraVersion)
+                                     String cassandraVersion,
+                                     boolean quoteIdentifiers)
         {
+            this.bridge = bridge;
             this.cqlColumns = cqlColumns;
             this.partitionKeyColumns = partitionKeyColumns;
             this.partitionKeyColumnTypes = partitionKeyColumnTypes;
             this.primaryKeyColumnNames = primaryKeyColumnNames;
             columns = cqlColumns;
             this.cassandraVersion = 
cassandraVersion.replaceAll("(\\w+-)*cassandra-", "");
+            this.quoteIdentifiers = quoteIdentifiers;
+            this.uniqueTableName = TEST_TABLE_PREFIX + 
TEST_TABLE_ID.getAndIncrement();
         }
 
         @Override
@@ -306,9 +335,9 @@ public final class TableSchemaTestCommon
         public String getCreateStatement()
         {
             String keyDef = getKeyDef();
-            String createStatement = "CREATE TABLE test.test (" + 
cqlColumns.entrySet()
+            String createStatement = "CREATE TABLE test." + uniqueTableName + 
" (" + cqlColumns.entrySet()
                                             .stream()
-                                            .map(column -> column.getKey() + " 
" + column.getValue().name())
+                                            .map(column -> 
maybeQuoteIdentifierIfRequested(column.getKey()) + " " + 
column.getValue().name())
                                             
.collect(Collectors.joining(",\n")) + ", " + keyDef + ") "
                                    + "WITH COMPRESSION = {'class': '" + 
getCompression() + "'};";
             System.out.println("Create Table:" + createStatement);
@@ -333,8 +362,12 @@ public final class TableSchemaTestCommon
             List<String> partitionColumns = 
Lists.newArrayList(partitionKeyColumns);
             List<String> primaryColumns = 
Lists.newArrayList(primaryKeyColumnNames);
             primaryColumns.removeAll(partitionColumns);
-            String partitionKey = "(" + String.join(",", partitionKeyColumns) 
+ ")";
-            String clusteringKey = String.join(",", primaryColumns);
+            String partitionKey = Arrays.stream(partitionKeyColumns)
+                                        
.map(this::maybeQuoteIdentifierIfRequested)
+                                        .collect(Collectors.joining(",", "(", 
")"));
+            String clusteringKey = primaryColumns.stream()
+                                                 
.map(this::maybeQuoteIdentifierIfRequested)
+                                                 
.collect(Collectors.joining(","));
             return "PRIMARY KEY (" + partitionKey + clusteringKey + ")";
         }
 
@@ -347,7 +380,7 @@ public final class TableSchemaTestCommon
         @Override
         public String getName()
         {
-            return "test";
+            return uniqueTableName;
         }
 
         @Override
@@ -367,5 +400,12 @@ public final class TableSchemaTestCommon
         {
             return cqlColumns.keySet().asList();
         }
+
+        private String maybeQuoteIdentifierIfRequested(String identifier)
+        {
+            return quoteIdentifiers
+                   ? bridge.maybeQuoteIdentifier(identifier)
+                   : identifier;
+        }
     }
 }
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
index d346ef3..eb4beb1 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
@@ -39,7 +39,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.KeyspaceMetadata;
 import com.datastax.driver.core.Session;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -49,6 +48,7 @@ import io.vertx.core.eventbus.Message;
 import io.vertx.core.eventbus.MessageConsumer;
 import io.vertx.core.json.JsonObject;
 import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
@@ -68,7 +68,7 @@ import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSAND
 public abstract class IntegrationTestBase
 {
     private static final int MAX_KEYSPACE_TABLE_WAIT_ATTEMPTS = 100;
-    private static final long MAX_KEYSPACE_TABLE_TIME = 100L;
+    private static final long MAX_KEYSPACE_TABLE_TIME = 200L;
     protected Logger logger = LoggerFactory.getLogger(this.getClass());
     protected Vertx vertx;
     protected Server server;
@@ -229,41 +229,37 @@ public abstract class IntegrationTestBase
     }
 
     /**
-     * Waits for the specified keyspace/table to be available.
+     * Waits for the specified keyspace to be available in Sidecar.
      * Empirically, this loop usually executes either zero or one time before 
completing.
      * However, we set a fairly high number of retries to account for 
variability in build machines.
      *
-     * @param keyspaceName the keyspace for which to wait
-     * @param tableName    the table in the keyspace for which to wait
+     * @param keyspace the keyspace for which to wait
      */
-    protected void waitForKeyspaceAndTable(String keyspaceName, String 
tableName)
+    protected void waitUntilSidecarPicksUpSchemaChange(String keyspace)
     {
-        int numInstances = 
sidecarTestContext.instancesConfig().instances().size();
         int retries = MAX_KEYSPACE_TABLE_WAIT_ATTEMPTS;
-        boolean sidecarMissingSchema = true;
-        while (sidecarMissingSchema && retries-- > 0)
+        WebClient client = WebClient.create(vertx);
+        while (retries-- > 0)
         {
-            sidecarMissingSchema = false;
-            for (int i = 0; i < numInstances; i++)
+            try
             {
-                KeyspaceMetadata keyspace = sidecarTestContext.session(i)
-                                                              .getCluster()
-                                                              .getMetadata()
-                                                              
.getKeyspace(keyspaceName);
-                sidecarMissingSchema |= (keyspace == null || 
keyspace.getTable(tableName) == null);
+                client.get(server.actualPort(), "localhost", 
"/api/v1/keyspaces/" + keyspace + "/schema")
+                      .expect(ResponsePredicate.SC_OK)
+                      .send()
+                      .toCompletionStage()
+                      .toCompletableFuture()
+                      .get(MAX_KEYSPACE_TABLE_TIME, TimeUnit.MILLISECONDS);
+                logger.info("Schema is ready in Sidecar");
+                client.close();
+                return;
             }
-            if (sidecarMissingSchema)
+            catch (Exception exception)
             {
-                logger.info("Keyspace/table {}/{} not yet available - 
waiting...", keyspaceName, tableName);
+                logger.info("Waiting for schema to propagate to Sidecar");
                 Uninterruptibles.sleepUninterruptibly(MAX_KEYSPACE_TABLE_TIME, 
TimeUnit.MILLISECONDS);
             }
-            else
-            {
-                return;
-            }
         }
-        throw new RuntimeException(
-        String.format("Keyspace/table %s/%s did not become visible on all 
sidecar instances",
-                      keyspaceName, tableName));
+        client.close();
+        throw new RuntimeException(String.format("Keyspace %s did not become 
visible in Sidecar", keyspace));
     }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
index 59007a1..0ee434d 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java
@@ -64,7 +64,7 @@ public class BulkWriteTtlTest extends IntegrationTestBase
                              + "          marks BIGINT\n"
                              + "     )  WITH default_time_to_live = 1;"
         );
-        waitForKeyspaceAndTable(keyspace, table);
+        waitUntilSidecarPicksUpSchemaChange(keyspace);
         boolean addTTLColumn = false;
         boolean addTimestampColumn = false;
         IntegrationTestJob.builder((recordNum) -> generateCourse(recordNum, 
null, null),
@@ -96,7 +96,7 @@ public class BulkWriteTtlTest extends IntegrationTestBase
                              + "          marks BIGINT\n"
                              + "     );"
         );
-        waitForKeyspaceAndTable(keyspace, table);
+        waitUntilSidecarPicksUpSchemaChange(keyspace);
         Map<String, String> writerOptions = 
ImmutableMap.of(WriterOptions.TTL.name(), TTLOption.constant(1));
         boolean addTTLColumn = false;
         boolean addTimestampColumn = false;
@@ -129,7 +129,7 @@ public class BulkWriteTtlTest extends IntegrationTestBase
                              + "          marks BIGINT\n"
                              + "     );"
         );
-        waitForKeyspaceAndTable(keyspace, table);
+        waitUntilSidecarPicksUpSchemaChange(keyspace);
         Map<String, String> writerOptions = 
ImmutableMap.of(WriterOptions.TTL.name(), TTLOption.perRow("ttl"));
         boolean addTTLColumn = true;
         boolean addTimestampColumn = false;
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java
similarity index 80%
rename from 
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersTest.java
rename to 
cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java
index c2aa4be..0cd3444 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java
@@ -22,17 +22,11 @@ package org.apache.cassandra.analytics;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import io.vertx.ext.web.client.WebClient;
-import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
@@ -45,22 +39,22 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests the bulk reader behavior when requiring quoted identifiers for 
keyspace, table name, and column names.
- * These tests exercise a full integration test, which includes testing 
Sidecar behavior when dealing with quoted
+ *
+ * <p>These tests exercise a full integration test, which includes testing 
Sidecar behavior when dealing with quoted
  * identifiers.
  */
 @ExtendWith(VertxExtension.class)
-class QuoteIdentifiersTest extends SparkIntegrationTestBase
+class QuoteIdentifiersReadTest extends SparkIntegrationTestBase
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(QuoteIdentifiersTest.class);
 
-    @CassandraIntegrationTest(nodesPerDc = 3, gossip = true)
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
     void testMixedCaseKeyspace(VertxTestContext context)
     {
         QualifiedName qualifiedTableName = 
uniqueTestTableFullName("QuOtEd_KeYsPaCe");
         runTestScenario(context, qualifiedTableName);
     }
 
-    @CassandraIntegrationTest(nodesPerDc = 3, gossip = true)
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
     void testReservedWordKeyspace(VertxTestContext context)
     {
         // keyspace is a reserved word
@@ -68,27 +62,27 @@ class QuoteIdentifiersTest extends SparkIntegrationTestBase
         runTestScenario(context, qualifiedTableName);
     }
 
-    @CassandraIntegrationTest(nodesPerDc = 3, gossip = true)
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
     void testMixedCaseTable(VertxTestContext context)
     {
         QualifiedName qualifiedTableName = 
uniqueTestTableFullName(TEST_KEYSPACE, "QuOtEd_TaBlE");
         runTestScenario(context, qualifiedTableName);
     }
 
-    @CassandraIntegrationTest(nodesPerDc = 3, gossip = true)
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
     void testReservedWordTable(VertxTestContext context)
     {
         // table is a reserved word
         runTestScenario(context, new QualifiedName(TEST_KEYSPACE, "table"));
     }
 
-    @CassandraIntegrationTest(nodesPerDc = 3, gossip = true)
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
     void testReadComplexSchema(VertxTestContext context)
     {
         QualifiedName tableName = uniqueTestTableFullName("QuOtEd_KeYsPaCe", 
"QuOtEd_TaBlE");
 
         String quotedKeyspace = tableName.maybeQuotedKeyspace();
-        createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 3));
+        createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 1));
 
         // Create UDT
         String createUdtQuery = "CREATE TYPE " + quotedKeyspace + ".\"UdT1\" 
(\"TimE\" bigint, \"limit\" int);";
@@ -129,7 +123,7 @@ class QuoteIdentifiersTest extends SparkIntegrationTestBase
     {
         String quotedKeyspace = tableName.maybeQuotedKeyspace();
 
-        createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 3));
+        createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 1));
         createTestTable(String.format("CREATE TABLE IF NOT EXISTS %s 
(\"IdEnTiFiEr\" text, IdEnTiFiEr int, PRIMARY KEY(\"IdEnTiFiEr\"));",
                                       tableName));
         List<String> dataset = Arrays.asList("a", "b", "c", "d", "e", "f", 
"g");
@@ -180,29 +174,4 @@ class QuoteIdentifiersTest extends SparkIntegrationTestBase
                               .execute(query, ConsistencyLevel.ALL);
         }
     }
-
-    void waitUntilSidecarPicksUpSchemaChange(String quotedKeyspace)
-    {
-        WebClient client = WebClient.create(vertx);
-        while (true)
-        {
-            try
-            {
-                client.get(server.actualPort(), "localhost", 
"/api/v1/keyspaces/" + quotedKeyspace + "/schema")
-                      .expect(ResponsePredicate.SC_OK)
-                      .send()
-                      .toCompletionStage()
-                      .toCompletableFuture()
-                      .get(30, TimeUnit.SECONDS);
-                LOGGER.info("Schema is ready in Sidecar");
-                break;
-            }
-            catch (Exception exception)
-            {
-                LOGGER.info("Waiting for schema to propagate to Sidecar");
-                Uninterruptibles.sleepUninterruptibly(200, 
TimeUnit.MILLISECONDS);
-            }
-        }
-        client.close();
-    }
 }
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java
new file mode 100644
index 0000000..63b947a
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.junit5.VertxExtension;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the bulk writer behavior when requiring quoted identifiers for 
keyspace, table name, and column names.
+ *
+ * <p>These tests exercise a full integration test, which includes testing 
Sidecar behavior when dealing with quoted
+ * identifiers.
+ */
+@ExtendWith(VertxExtension.class)
+class QuoteIdentifiersWriteTest extends SparkIntegrationTestBase
+{
+    static final int ROW_COUNT = 10_000;
+
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
+    void testWriteWithMixedCaseKeyspaceName()
+    {
+        QualifiedName qualifiedTableName = 
uniqueTestTableFullName("QuOtEd_KeYsPaCe");
+        runWriteTestScenario(qualifiedTableName);
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
+    void testWriteWithReservedWordKeyspaceName()
+    {
+        // keyspace is a reserved word
+        QualifiedName qualifiedTableName = uniqueTestTableFullName("keyspace");
+        runWriteTestScenario(qualifiedTableName);
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
+    void testWriteWithMixedCaseTableName()
+    {
+        QualifiedName qualifiedTableName = 
uniqueTestTableFullName(TEST_KEYSPACE, "QuOtEd_TaBlE");
+        runWriteTestScenario(qualifiedTableName);
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 1, gossip = true)
+    void testWriteWithReservedWordTableName()
+    {
+        // table is a reserved word
+        runWriteTestScenario(new QualifiedName(TEST_KEYSPACE, "table"));
+    }
+
+    void runWriteTestScenario(QualifiedName tableName)
+    {
+        String quotedKeyspace = tableName.maybeQuotedKeyspace();
+        createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 1));
+        createTestTable(String.format("CREATE TABLE IF NOT EXISTS %s 
(\"IdEnTiFiEr\" bigint, course blob, \"limit\" bigint, PRIMARY 
KEY(\"IdEnTiFiEr\"));",
+                                      tableName));
+        waitUntilSidecarPicksUpSchemaChange(tableName.maybeQuotedKeyspace());
+
+        SparkSession spark = getOrCreateSparkSession();
+        SparkContext sc = spark.sparkContext();
+        JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(sc);
+        SQLContext sql = spark.sqlContext();
+
+        int parallelism = sc.defaultParallelism();
+        JavaRDD<Row> rows = genDataset(javaSparkContext, ROW_COUNT, 
parallelism);
+        Dataset<Row> df = sql.createDataFrame(rows, writeSchema());
+
+        bulkWriterDataFrameWriter(df, tableName).option("quote_identifiers", 
"true")
+                                                .save();
+        validateWrites(tableName, rows);
+    }
+
+    private void validateWrites(QualifiedName tableName, JavaRDD<Row> 
rowsWritten)
+    {
+        // build a set of entries read from Cassandra into a set
+        Set<String> actualEntries = 
Arrays.stream(sidecarTestContext.cassandraTestContext()
+                                                                    .cluster()
+                                                                    
.coordinator(1)
+                                                                    
.execute(String.format("SELECT * FROM %s;", tableName), 
ConsistencyLevel.LOCAL_QUORUM))
+                                          .map((Object[] columns) -> 
String.format("%s:%s:%s",
+                                                                               
    new String(((ByteBuffer) columns[1]).array(), StandardCharsets.UTF_8),
+                                                                               
    columns[0],
+                                                                               
    columns[2]))
+                                          .collect(Collectors.toSet());
+
+        // Number of entries in Cassandra must match the original datasource
+        assertThat(actualEntries.size()).isEqualTo(rowsWritten.count());
+
+        // remove from actual entries to make sure that the data read is the 
same as the data written
+        rowsWritten.collect()
+                   .forEach(row -> {
+                       String key = String.format("%s:%d:%d",
+                                                  new String((byte[]) 
row.get(0), StandardCharsets.UTF_8),
+                                                  row.getLong(1),
+                                                  row.getLong(2));
+                       assertThat(actualEntries.remove(key)).as(key + " is 
expected to exist in the actual entries")
+                                                            .isTrue();
+                   });
+
+        // If this fails, it means there was more data in the database than we 
expected
+        assertThat(actualEntries).as("All entries are expected to be read from 
database")
+                                 .isEmpty();
+    }
+
+    static StructType writeSchema()
+    {
+        return new StructType()
+               .add("course", BinaryType, false)
+               .add("IdEnTiFiEr", LongType, false) // case-sensitive struct
+               .add("limit", LongType, false); // limit is a reserved word in 
Cassandra
+    }
+
+    private static JavaRDD<Row> genDataset(JavaSparkContext sc, int 
recordCount, Integer parallelism)
+    {
+        long recordsPerPartition = recordCount / parallelism;
+        long remainder = recordCount - (recordsPerPartition * parallelism);
+        List<Integer> seq = IntStream.range(0, 
parallelism).boxed().collect(Collectors.toList());
+        return sc.parallelize(seq, parallelism).mapPartitionsWithIndex(
+        (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, 
integerIterator) -> {
+            long firstRecordNumber = index * recordsPerPartition;
+            long recordsToGenerate = index.equals(parallelism) ? remainder : 
recordsPerPartition;
+            return LongStream.range(0, recordsToGenerate).mapToObj(offset -> {
+                long limit = firstRecordNumber + offset;
+                return RowFactory.create(("course-" + limit).getBytes(), 
limit, limit);
+            }).iterator();
+        }, false);
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkBulkWriterSimpleTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkBulkWriterSimpleTest.java
index 916b243..cd16930 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkBulkWriterSimpleTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkBulkWriterSimpleTest.java
@@ -57,7 +57,7 @@ public class SparkBulkWriterSimpleTest extends 
IntegrationTestBase
                              + "          course BLOB,\n"
                              + "          marks BIGINT\n"
                              + "     );");
-        waitForKeyspaceAndTable(keyspace, table);
+        waitUntilSidecarPicksUpSchemaChange(keyspace);
         Map<String, String> writerOptions = new HashMap<>();
         // A constant timestamp and TTL can be used by adding the following 
options to the writerOptions map
         // writerOptions.put(WriterOptions.TTL.name(), TTLOption.constant(20));
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java
index 7d09844..a33bc78 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java
@@ -20,6 +20,7 @@
 package org.apache.cassandra.analytics;
 
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
@@ -28,6 +29,9 @@ import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.DataFrameReader;
+import org.apache.spark.sql.DataFrameWriter;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 
@@ -52,7 +56,7 @@ public class SparkIntegrationTestBase extends 
IntegrationTestBase
         int numCores = coresPerExecutor * numExecutors;
 
         return 
sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource")
-                  .option("sidecar_instances", 
"localhost,localhost2,localhost3")
+                  .option("sidecar_instances", "localhost")
                   .option("keyspace", tableName.keyspace()) // unquoted
                   .option("table", tableName.table()) // unquoted
                   .option("DC", "datacenter1")
@@ -64,12 +68,27 @@ public class SparkIntegrationTestBase extends 
IntegrationTestBase
                   .option("sidecar_port", server.actualPort());
     }
 
+    protected DataFrameWriter<Row> bulkWriterDataFrameWriter(Dataset<Row> df, 
QualifiedName tableName)
+    {
+        String sidecarInstances = 
sidecarTestContext.instancesConfig().instances().stream().map(f -> 
f.host()).collect(Collectors.joining(","));
+        return df.write()
+                 
.format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
+                 .option("sidecar_instances", sidecarInstances)
+                 .option("keyspace", tableName.keyspace())
+                 .option("table", tableName.table())
+                 .option("local_dc", "datacenter1")
+                 .option("bulk_writer_cl", "LOCAL_QUORUM")
+                 .option("number_splits", "-1")
+                 .option("sidecar_port", server.actualPort())
+                 .mode("append");
+    }
+
     protected SparkConf getOrCreateSparkConf()
     {
         if (sparkConf == null)
         {
             sparkConf = new SparkConf()
-                        .setAppName("Integration test Spark Cassandra Bulk 
Reader Job")
+                        .setAppName("Integration test Spark Cassandra Bulk 
Analytics Job")
                         .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
                         // Spark is not case-sensitive by default, but we want 
to make it case-sensitive for
                         // the quoted identifiers tests where we test mixed 
case
@@ -87,7 +106,7 @@ public class SparkIntegrationTestBase extends 
IntegrationTestBase
         {
             sparkSession = SparkSession
                            .builder()
-                           .config(sparkConf)
+                           .config(getOrCreateSparkConf())
                            .getOrCreate();
         }
         return sparkSession;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to