This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new c7c3bbc CASSANDRA-19031: Fix bulk writing when using identifiers that need quotes c7c3bbc is described below commit c7c3bbca2c7cb415b39689e924fa2357c239f043 Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Tue Nov 14 16:28:14 2023 -0800 CASSANDRA-19031: Fix bulk writing when using identifiers that need quotes Cassandra treats all identifiers as lower case unless explicitly quoted by the users, (i.e. keyspace names, table names, column names, etc). We can define a case-sensitive identifier or we can use a reserved word as an identifier by quoting it during DDL creation. In the analytics library, bulk writing fails when we encounter these identifiers. In this commit, we fix the issue by property propagating the information about whether identifiers need to be quoted by exposing a new dataframe option (`quote_identifiers`). When set to `true`, it will _maybe_ quote the keyspace/table/column names and it will properly be able to write data when using mixed-case or reserved words in the identifiers. Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19031 --- .circleci/config.yml | 2 + CHANGES.txt | 1 + .../cassandra/bridge/CassandraBridgeFactory.java | 14 +- .../cassandra/spark/bulkwriter/BulkSparkConf.java | 2 + .../bulkwriter/CassandraBulkWriterContext.java | 38 ++++- .../spark/bulkwriter/CassandraClusterInfo.java | 23 ++- .../spark/bulkwriter/CassandraJobInfo.java | 17 +- .../apache/cassandra/spark/bulkwriter/JobInfo.java | 13 +- .../spark/bulkwriter/SidecarDataTransferApi.java | 28 +++- .../cassandra/spark/bulkwriter/TTLOption.java | 6 +- .../cassandra/spark/bulkwriter/TableSchema.java | 59 ++++--- .../spark/bulkwriter/TimestampOption.java | 6 +- .../cassandra/spark/bulkwriter/WriterOptions.java | 7 +- .../spark/bulkwriter/util/SbwKryoRegistrator.java | 2 +- .../spark/bulkwriter/BulkSparkConfTest.java | 11 ++ .../spark/bulkwriter/MockBulkWriterContext.java | 90 +++++++---- .../spark/bulkwriter/RecordWriterTest.java | 41 ++++- .../spark/bulkwriter/TableSchemaTest.java | 25 ++- .../spark/bulkwriter/TableSchemaTestCommon.java | 68 ++++++-- .../sidecar/testing/IntegrationTestBase.java | 46 +++--- .../cassandra/analytics/BulkWriteTtlTest.java | 6 +- ...iersTest.java => QuoteIdentifiersReadTest.java} | 51 ++---- .../analytics/QuoteIdentifiersWriteTest.java | 171 +++++++++++++++++++++ .../analytics/SparkBulkWriterSimpleTest.java | 2 +- .../analytics/SparkIntegrationTestBase.java | 25 ++- 25 files changed, 580 insertions(+), 174 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index df4b720..bdf0a5a 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,6 +58,7 @@ jobs: SPARK_VERSION: "2" SCALA_VERSION: "2.11" JDK_VERSION: "1.8" + INTEGRATION_MAX_PARALLEL_FORKS: 2 - store_artifacts: path: build/test-reports @@ -85,6 +86,7 @@ jobs: SPARK_VERSION: "2" SCALA_VERSION: "2.12" JDK_VERSION: "1.8" + INTEGRATION_MAX_PARALLEL_FORKS: 2 - store_artifacts: path: build/test-reports diff --git a/CHANGES.txt b/CHANGES.txt index 9aaa0d3..f580f89 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Fix bulk writing when using identifiers that need quotes (CASSANDRA-19031) * Fix bulk reading when using identifiers that need quotes (CASSANDRA-19024) * Remove unused dead code (CASSANDRA-19148) * Get Sidecar port through CassandraContext for more flexibility (CASSANDRA-19903) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java index 7e2cd51..1f69a4c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/bridge/CassandraBridgeFactory.java @@ -124,7 +124,6 @@ public final class CassandraBridgeFactory Class<CassandraBridge> bridge = (Class<CassandraBridge>) loader.loadClass("org.apache.cassandra.bridge.CassandraBridgeImplementation"); Constructor<CassandraBridge> constructor = bridge.getConstructor(); return constructor.newInstance(); - } catch (IOException | ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException exception) @@ -132,4 +131,17 @@ public final class CassandraBridgeFactory throw new RuntimeException("Failed to create Cassandra bridge for label " + label, exception); } } + + /*** + * Returns the quoted name when the {@code quoteIdentifiers} parameter is {@code true} <i>AND</i> the + * {@code unquotedName} needs to be quoted (i.e. it uses mixed case, or it is a Cassandra reserved word). + * @param bridge the Cassandra bridge + * @param quoteIdentifiers whether identifiers should be quoted + * @param unquotedName the unquoted name to maybe be quoted + * @return the quoted name when the conditions are met + */ + public static String maybeQuotedIdentifier(CassandraBridge bridge, boolean quoteIdentifiers, String unquotedName) + { + return quoteIdentifiers ? bridge.maybeQuoteIdentifier(unquotedName) : unquotedName; + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index b98d4c5..7ea972e 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -126,6 +126,7 @@ public class BulkSparkConf implements Serializable protected final SparkConf conf; public final boolean validateSSTables; public final int commitThreadsPerInstance; + public boolean quoteIdentifiers; protected final int effectiveSidecarPort; protected final int userProvidedSidecarPort; protected boolean useOpenSsl; @@ -166,6 +167,7 @@ public class BulkSparkConf implements Serializable this.ringRetryCount = getInt(RING_RETRY_COUNT, DEFAULT_RING_RETRY_COUNT); this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), null); this.timestamp = MapUtils.getOrDefault(options, WriterOptions.TIMESTAMP.name(), null); + this.quoteIdentifiers = MapUtils.getBoolean(options, WriterOptions.QUOTE_IDENTIFIERS.name(), false, "quote identifiers"); validateEnvironment(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index f093e13..439b1a6 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -63,6 +63,9 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial { this.conf = conf; this.clusterInfo = clusterInfo; + String lowestCassandraVersion = clusterInfo.getLowestCassandraVersion(); + CassandraBridge bridge = CassandraBridgeFactory.get(lowestCassandraVersion); + CassandraRing<RingInstance> ring = clusterInfo.getRing(true); jobInfo = new CassandraJobInfo(conf, new TokenPartitioner(ring, conf.numberSplits, sparkContext.defaultParallelism(), conf.getCores())); @@ -71,20 +74,20 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial String.format("Keyspace %s is not replicated on datacenter %s", conf.keyspace, conf.localDC)); - String keyspace = conf.keyspace; - String table = conf.table; + String keyspace = jobInfo.keyspace(); + String table = jobInfo.tableName(); String keyspaceSchema = clusterInfo.getKeyspaceSchema(true); - CassandraBridge bridge = CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion()); Partitioner partitioner = clusterInfo.getPartitioner(); - String tableSchema = CqlUtils.extractTableSchema(keyspaceSchema, keyspace, table); + String createTableSchema = CqlUtils.extractTableSchema(keyspaceSchema, keyspace, table); Set<String> udts = CqlUtils.extractUdts(keyspaceSchema, keyspace); ReplicationFactor replicationFactor = CqlUtils.extractReplicationFactor(keyspaceSchema, keyspace); int indexCount = CqlUtils.extractIndexCount(keyspaceSchema, keyspace, table); - CqlTable cqlTable = bridge.buildSchema(tableSchema, keyspace, replicationFactor, partitioner, udts, null, indexCount); + CqlTable cqlTable = bridge.buildSchema(createTableSchema, keyspace, replicationFactor, partitioner, udts, null, indexCount); - TableInfoProvider tableInfoProvider = new CqlTableInfoProvider(tableSchema, cqlTable); - schemaInfo = new CassandraSchemaInfo(new TableSchema(dfSchema, tableInfoProvider, conf.writeMode, conf.getTTLOptions(), conf.getTimestampOptions())); + TableInfoProvider tableInfoProvider = new CqlTableInfoProvider(createTableSchema, cqlTable); + TableSchema tableSchema = initializeTableSchema(conf, dfSchema, tableInfoProvider, lowestCassandraVersion); + schemaInfo = new CassandraSchemaInfo(tableSchema); } public static BulkWriterContext fromOptions(@NotNull SparkContext sparkContext, @@ -180,8 +183,27 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial { if (dataTransferApi == null) { - dataTransferApi = new SidecarDataTransferApi(clusterInfo.getCassandraContext(), jobInfo, conf); + CassandraBridge bridge = CassandraBridgeFactory.get(clusterInfo.getLowestCassandraVersion()); + dataTransferApi = new SidecarDataTransferApi(clusterInfo.getCassandraContext(), + bridge, + jobInfo, + conf); } return dataTransferApi; } + + @NotNull + protected TableSchema initializeTableSchema(@NotNull BulkSparkConf conf, + @NotNull StructType dfSchema, + TableInfoProvider tableInfoProvider, + String lowestCassandraVersion) + { + return new TableSchema(dfSchema, + tableInfoProvider, + conf.writeMode, + conf.getTTLOptions(), + conf.getTimestampOptions(), + lowestCassandraVersion, + conf.quoteIdentifiers); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java index b490390..19c7137 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java @@ -34,6 +34,8 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CassandraVersionFeatures; import org.apache.cassandra.clients.Sidecar; import org.apache.cassandra.sidecar.client.SidecarInstance; @@ -53,6 +55,8 @@ import org.apache.cassandra.spark.utils.CqlUtils; import org.apache.cassandra.spark.utils.FutureUtils; import org.jetbrains.annotations.NotNull; +import static org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier; + public class CassandraClusterInfo implements ClusterInfo, Closeable { private static final long serialVersionUID = -6944818863462956767L; @@ -233,7 +237,9 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable protected String getCurrentKeyspaceSchema() throws Exception { - SchemaResponse schemaResponse = getCassandraContext().getSidecarClient().schema(conf.keyspace).get(); + SchemaResponse schemaResponse = getCassandraContext().getSidecarClient() + .schema(maybeQuotedIdentifier(bridge(), conf.quoteIdentifiers, conf.keyspace)) + .get(); return schemaResponse.schema(); } @@ -243,7 +249,11 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable RingResponse ringResponse = getCurrentRingResponse(); List<RingInstance> instances = getSerializableInstances(ringResponse); ReplicationFactor replicationFactor = getReplicationFactor(); - return new CassandraRing<>(getPartitioner(), conf.keyspace, replicationFactor, instances); + + return new CassandraRing<>(getPartitioner(), + maybeQuotedIdentifier(bridge(), conf.quoteIdentifiers, conf.keyspace), + replicationFactor, + instances); } @NotNull @@ -403,7 +413,9 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable private RingResponse getCurrentRingResponse() throws Exception { - return getCassandraContext().getSidecarClient().ring(conf.keyspace).get(); + return getCassandraContext().getSidecarClient() + .ring(maybeQuotedIdentifier(bridge(), conf.quoteIdentifiers, conf.keyspace)) + .get(); } private static List<RingInstance> getSerializableInstances(RingResponse ringResponse) @@ -535,6 +547,11 @@ public class CassandraClusterInfo implements ClusterInfo, Closeable return false; } + protected CassandraBridge bridge() + { + return CassandraBridgeFactory.get(getLowestCassandraVersion()); + } + // Startup Validation @Override diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java index 5840c8a..73104fa 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java @@ -113,9 +113,20 @@ public class CassandraJobInfo implements JobInfo } @Override - @NotNull - public String getFullTableName() + public boolean quoteIdentifiers() + { + return conf.quoteIdentifiers; + } + + @Override + public String keyspace() + { + return conf.keyspace; + } + + @Override + public String tableName() { - return conf.keyspace + "." + conf.table; + return conf.table; } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java index f7ac149..15185b3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java @@ -52,6 +52,17 @@ public interface JobInfo extends Serializable boolean skipExtendedVerify(); - String getFullTableName(); + boolean quoteIdentifiers(); + + String keyspace(); + + String tableName(); + + @NotNull + default String getFullTableName() + { + return keyspace() + "." + tableName(); + } + boolean getSkipClean(); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java index c3cad7f..8ce5619 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SidecarDataTransferApi.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.sidecar.client.SidecarClient; import org.apache.cassandra.sidecar.client.SidecarInstanceImpl; import org.apache.cassandra.sidecar.client.request.ImportSSTableRequest; @@ -35,6 +36,8 @@ import org.apache.cassandra.spark.common.MD5Hash; import org.apache.cassandra.spark.common.client.ClientException; import org.apache.cassandra.spark.common.model.CassandraInstance; +import static org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier; + /** * A {@link DataTransferApi} implementation that interacts with Cassandra Sidecar */ @@ -46,14 +49,16 @@ public class SidecarDataTransferApi implements DataTransferApi private static final int SSTABLE_GENERATION_REVERSE_OFFSET = 3; private final transient SidecarClient sidecarClient; + private final CassandraBridge bridge; private final int sidecarPort; private final JobInfo job; private final BulkSparkConf conf; - public SidecarDataTransferApi(CassandraContext cassandraContext, JobInfo job, BulkSparkConf conf) + public SidecarDataTransferApi(CassandraContext cassandraContext, CassandraBridge bridge, JobInfo job, BulkSparkConf conf) { this.sidecarClient = cassandraContext.getSidecarClient(); this.sidecarPort = cassandraContext.sidecarPort(); + this.bridge = bridge; this.job = job; this.conf = conf; } @@ -69,17 +74,22 @@ public class SidecarDataTransferApi implements DataTransferApi String uploadId = getUploadId(sessionID, job.getId().toString()); try { - sidecarClient.uploadSSTableRequest(toSidecarInstance(instance), conf.keyspace, conf.table, uploadId, - componentName, fileHash.toString(), - componentFile.toAbsolutePath().toString()).get(); + sidecarClient.uploadSSTableRequest(toSidecarInstance(instance), + maybeQuotedIdentifier(bridge, conf.quoteIdentifiers, conf.keyspace), + maybeQuotedIdentifier(bridge, conf.quoteIdentifiers, conf.table), + uploadId, + componentName, + fileHash.toString(), + componentFile.toAbsolutePath().toString()) + .get(); } catch (ExecutionException | InterruptedException exception) { LOGGER.warn("Failed to upload file={}, keyspace={}, table={}, uploadId={}, componentName={}, instance={}", componentFile, conf.keyspace, conf.table, uploadId, componentName, instance); throw new ClientException( - String.format("Failed to upload file=%s into keyspace=%s, table=%s, componentName=%s with uploadId=%s to instance=%s", - componentFile, conf.keyspace, conf.table, componentName, uploadId, instance), exception); + String.format("Failed to upload file=%s into keyspace=%s, table=%s, componentName=%s with uploadId=%s to instance=%s", + componentFile, conf.keyspace, conf.table, componentName, uploadId, instance), exception); } } @@ -104,7 +114,11 @@ public class SidecarDataTransferApi implements DataTransferApi try { SSTableImportResponse response = - sidecarClient.importSSTableRequest(toSidecarInstance(instance), conf.keyspace, conf.table, uploadId, importOptions).get(); + sidecarClient.importSSTableRequest(toSidecarInstance(instance), + maybeQuotedIdentifier(bridge, conf.quoteIdentifiers, conf.keyspace), + maybeQuotedIdentifier(bridge, conf.quoteIdentifiers, conf.table), + uploadId, + importOptions).get(); if (response.success()) { return new RemoteCommitResult(response.success(), Collections.emptyList(), Collections.singletonList(uploadId), null); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java index 21a07be..af9c195 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java @@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter; import java.io.Serializable; import java.time.Duration; +import java.util.function.Function; public final class TTLOption implements Serializable { @@ -111,12 +112,11 @@ public final class TTLOption implements Serializable && (ttlColumnName != null || ttlInSeconds != null); } - @Override - public String toString() + public String toCQLString(Function<String, String> maybeQuoteFunction) { if (ttlColumnName != null && !ttlColumnName.isEmpty()) { - return ":" + ttlColumnName; + return ":" + maybeQuoteFunction.apply(ttlColumnName); } if (ttlInSeconds != null) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java index fa3371f..99a6a8d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java @@ -29,13 +29,17 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.base.Preconditions; -import org.apache.cassandra.spark.data.CqlField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.spark.common.schema.ColumnType; +import org.apache.cassandra.spark.data.CqlField; import org.apache.spark.sql.types.StructType; +import static org.apache.cassandra.bridge.CassandraBridgeFactory.maybeQuotedIdentifier; + public class TableSchema implements Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(TableSchema.class); @@ -49,13 +53,22 @@ public class TableSchema implements Serializable private final WriteMode writeMode; private final TTLOption ttlOption; private final TimestampOption timestampOption; + private final String lowestCassandraVersion; + private final boolean quoteIdentifiers; - public TableSchema(StructType dfSchema, TableInfoProvider tableInfo, WriteMode writeMode, - TTLOption ttlOption, TimestampOption timestampOption) + public TableSchema(StructType dfSchema, + TableInfoProvider tableInfo, + WriteMode writeMode, + TTLOption ttlOption, + TimestampOption timestampOption, + String lowestCassandraVersion, + boolean quoteIdentifiers) { this.writeMode = writeMode; this.ttlOption = ttlOption; this.timestampOption = timestampOption; + this.lowestCassandraVersion = lowestCassandraVersion; + this.quoteIdentifiers = quoteIdentifiers; validateDataFrameCompatibility(dfSchema, tableInfo); validateNoSecondaryIndexes(tableInfo); @@ -123,7 +136,7 @@ public class TableSchema implements Serializable CqlField.CqlType cqlType = tableInfo.getColumnType(fieldName); return SqlToCqlTypeConverter.getConverter(cqlType); }) - .collect(Collectors.toList()); + .collect(Collectors.toList()); } private static List<ColumnType<?>> getPartitionKeyColumnTypes(TableInfoProvider tableInfo) @@ -156,35 +169,44 @@ public class TableSchema implements Serializable } } - private static String getInsertStatement(StructType dfSchema, TableInfoProvider tableInfo, - TTLOption ttlOption, TimestampOption timestampOption) + private String getInsertStatement(StructType dfSchema, + TableInfoProvider tableInfo, + TTLOption ttlOption, + TimestampOption timestampOption) { + CassandraBridge bridge = CassandraBridgeFactory.get(lowestCassandraVersion); List<String> columnNames = Arrays.stream(dfSchema.fieldNames()) .filter(fieldName -> !fieldName.equals(ttlOption.columnName())) .filter(fieldName -> !fieldName.equals(timestampOption.columnName())) .collect(Collectors.toList()); StringBuilder stringBuilder = new StringBuilder("INSERT INTO ") - .append(tableInfo.getKeyspaceName()) - .append(".").append(tableInfo.getName()) - .append(columnNames.stream().collect(Collectors.joining(",", " (", ") "))) - .append("VALUES") - .append(columnNames.stream().map(columnName -> ":" + columnName).collect(Collectors.joining(",", " (", ")"))); + .append(maybeQuotedIdentifier(bridge, quoteIdentifiers, tableInfo.getKeyspaceName())) + .append(".") + .append(maybeQuotedIdentifier(bridge, quoteIdentifiers, tableInfo.getName())) + .append(columnNames.stream() + .map(columnName -> maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName)) + .collect(Collectors.joining(",", " (", ") "))); + + stringBuilder.append("VALUES") + .append(columnNames.stream() + .map(columnName -> ":" + maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName)) + .collect(Collectors.joining(",", " (", ")"))); if (ttlOption.withTTl() && timestampOption.withTimestamp()) { stringBuilder.append(" USING TIMESTAMP ") - .append(timestampOption) + .append(timestampOption.toCQLString(columnName -> maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName))) .append(" AND TTL ") - .append(ttlOption); + .append(ttlOption.toCQLString(columnName -> maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName))); } else if (timestampOption.withTimestamp()) { stringBuilder.append(" USING TIMESTAMP ") - .append(timestampOption); + .append(timestampOption.toCQLString(columnName -> maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName))); } else if (ttlOption.withTTl()) { stringBuilder.append(" USING TTL ") - .append(ttlOption); + .append(ttlOption.toCQLString(columnName -> maybeQuotedIdentifier(bridge, quoteIdentifiers, columnName))); } stringBuilder.append(";"); String insertStatement = stringBuilder.toString(); @@ -195,10 +217,11 @@ public class TableSchema implements Serializable private String getDeleteStatement(StructType dfSchema, TableInfoProvider tableInfo) { - Stream<String> fieldEqualityStatements = Arrays.stream(dfSchema.fieldNames()).map(key -> key + "=?"); + CassandraBridge bridge = CassandraBridgeFactory.get(lowestCassandraVersion); + Stream<String> fieldEqualityStatements = Arrays.stream(dfSchema.fieldNames()).map(key -> maybeQuotedIdentifier(bridge, quoteIdentifiers, key) + "=?"); String deleteStatement = String.format("DELETE FROM %s.%s where %s;", - tableInfo.getKeyspaceName(), - tableInfo.getName(), + maybeQuotedIdentifier(bridge, quoteIdentifiers, tableInfo.getKeyspaceName()), + maybeQuotedIdentifier(bridge, quoteIdentifiers, tableInfo.getName()), fieldEqualityStatements.collect(Collectors.joining(" AND "))); LOGGER.info("CQL delete statement for the RDD {}", deleteStatement); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java index 06307b9..c63dbe7 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java @@ -22,6 +22,7 @@ package org.apache.cassandra.spark.bulkwriter; import java.io.Serializable; import java.time.Duration; import java.time.temporal.ChronoUnit; +import java.util.function.Function; public final class TimestampOption implements Serializable { @@ -109,12 +110,11 @@ public final class TimestampOption implements Serializable && (timestampColumnName != null || timeStampInMicroSeconds != null); } - @Override - public String toString() + public String toCQLString(Function<String, String> maybeQuoteFunction) { if (timestampColumnName != null && !timestampColumnName.isEmpty()) { - return ":" + timestampColumnName; + return ":" + maybeQuoteFunction.apply(timestampColumnName); } if (timeStampInMicroSeconds != null) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java index d824f05..95c7876 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java @@ -45,5 +45,10 @@ public enum WriterOptions implements WriterOption ROW_BUFFER_MODE, SSTABLE_DATA_SIZE_IN_MB, TTL, - TIMESTAMP + TIMESTAMP, + /** + * Option that specifies whether the identifiers (i.e. keyspace, table name, column names) should be quoted to + * support mixed case and reserved keyword names for these fields. + */ + QUOTE_IDENTIFIERS, } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java index baf66ac..1c5b8df 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/util/SbwKryoRegistrator.java @@ -53,7 +53,7 @@ public class SbwKryoRegistrator implements KryoRegistrator { LOGGER.debug("Registering Spark Bulk Writer classes with Kryo which require use of Java Serializer"); // NOTE: The order of calls to `register` matters, so we sort by class name just to make sure we always - // register classess in the same order - HashSet doesn't guarantee its iteration order + // register classes in the same order - HashSet doesn't guarantee its iteration order javaSerializableClasses.stream() .sorted(Comparator.comparing(Class::getCanonicalName)) .forEach(javaSerializableClass -> kryo.register(javaSerializableClass, new SbwJavaSerializer())); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java index 2c22f3e..bc46a21 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java @@ -268,6 +268,17 @@ public class BulkSparkConfTest assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); } + @Test + void testQuoteIdentifiers() + { + assertFalse(bulkSparkConf.quoteIdentifiers); + Map<String, String> options = copyDefaultOptions(); + options.put(WriterOptions.QUOTE_IDENTIFIERS.name(), "true"); + BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); + assertNotNull(bulkSparkConf); + assertTrue(bulkSparkConf.quoteIdentifiers); + } + private Map<String, String> copyDefaultOptions() { TreeMap<String, String> map = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index 19ba2d6..8aee396 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.tuple.Pair; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.sidecar.common.data.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.CassandraRing; @@ -63,6 +64,14 @@ import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCq public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, JobInfo, SchemaInfo, DataTransferApi { private static final long serialVersionUID = -2912371629236770646L; + public static final String[] DEFAULT_PARTITION_KEY_COLUMNS = {"id", "date"}; + public static final String[] DEFAULT_PRIMARY_KEY_COLUMN_NAMES = {"id", "date"}; + public static final Pair<StructType, ImmutableMap<String, CqlField.CqlType>> DEFAULT_VALID_PAIR = + TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns( + new String[]{"id", "date", "course", "marks"}, + new org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType, DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType}, + new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); + private final boolean quoteIdentifiers; private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED; private ConsistencyLevel.CL consistencyLevel; private int sstableDataSizeInMB = 128; @@ -90,35 +99,57 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo private CommitResultSupplier crSupplier = (uuids, dc) -> new RemoteCommitResult(true, Collections.emptyList(), uuids, null); private Predicate<CassandraInstance> uploadRequestConsumer = instance -> true; - private TTLOption ttlOption = TTLOption.forever(); + + public MockBulkWriterContext(CassandraRing<RingInstance> ring) + { + this(ring, + DEFAULT_CASSANDRA_VERSION, + ConsistencyLevel.CL.LOCAL_QUORUM, + DEFAULT_VALID_PAIR, + DEFAULT_PARTITION_KEY_COLUMNS, + DEFAULT_PRIMARY_KEY_COLUMN_NAMES, + false); + } public MockBulkWriterContext(CassandraRing<RingInstance> ring, String cassandraVersion, ConsistencyLevel.CL consistencyLevel) { + this(ring, cassandraVersion, consistencyLevel, DEFAULT_VALID_PAIR, DEFAULT_PARTITION_KEY_COLUMNS, DEFAULT_PRIMARY_KEY_COLUMN_NAMES, false); + } + + public MockBulkWriterContext(CassandraRing<RingInstance> ring, + String cassandraVersion, + ConsistencyLevel.CL consistencyLevel, + Pair<StructType, ImmutableMap<String, CqlField.CqlType>> validPair, + String[] partitionKeyColumns, + String[] primaryKeyColumnNames, + boolean quoteIdentifiers) + { + this.quoteIdentifiers = quoteIdentifiers; this.ring = ring; this.tokenPartitioner = new TokenPartitioner(ring, 1, 2, 2, false); this.cassandraVersion = cassandraVersion; this.consistencyLevel = consistencyLevel; - validPair = TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns( - new String[]{"id", "date", "course", "marks"}, - new org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType, DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType}, - new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); - StructType validDataFrameSchema = validPair.getKey(); - ImmutableMap<String, CqlField.CqlType> validCqlColumns = validPair.getValue(); - String[] partitionKeyColumns = {"id", "date"}; - String[] primaryKeyColumnNames = {"id", "date"}; + this.validPair = validPair; + StructType validDataFrameSchema = this.validPair.getKey(); + ImmutableMap<String, CqlField.CqlType> validCqlColumns = this.validPair.getValue(); ColumnType<?>[] partitionKeyColumnTypes = {ColumnTypes.INT, ColumnTypes.INT}; - this.schema = new TableSchemaTestCommon.MockTableSchemaBuilder() - .withCqlColumns(validCqlColumns) - .withPartitionKeyColumns(partitionKeyColumns) - .withPrimaryKeyColumnNames(primaryKeyColumnNames) - .withCassandraVersion(cassandraVersion) - .withPartitionKeyColumnTypes(partitionKeyColumnTypes) - .withWriteMode(WriteMode.INSERT) - .withDataFrameSchema(validDataFrameSchema) - .withTTLSetting(ttlOption) - .build(); + TTLOption ttlOption = TTLOption.forever(); + TableSchemaTestCommon.MockTableSchemaBuilder builder = new TableSchemaTestCommon.MockTableSchemaBuilder(CassandraBridgeFactory.get(cassandraVersion)) + .withCqlColumns(validCqlColumns) + .withPartitionKeyColumns(partitionKeyColumns) + .withPrimaryKeyColumnNames(primaryKeyColumnNames) + .withCassandraVersion(cassandraVersion) + .withPartitionKeyColumnTypes(partitionKeyColumnTypes) + .withWriteMode(WriteMode.INSERT) + .withDataFrameSchema(validDataFrameSchema) + .withTTLSetting(ttlOption); + if (quoteIdentifiers()) + { + builder.withQuotedIdentifiers(); + } + this.schema = builder.build(); this.jobId = java.util.UUID.randomUUID(); } @@ -132,11 +163,6 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo this.timeProvider = timeProvider; } - public MockBulkWriterContext(CassandraRing<RingInstance> ring) - { - this(ring, DEFAULT_CASSANDRA_VERSION, ConsistencyLevel.CL.LOCAL_QUORUM); - } - @Override public void shutdown() { @@ -412,9 +438,21 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo } @Override - public String getFullTableName() + public boolean quoteIdentifiers() + { + return quoteIdentifiers; + } + + @Override + public String keyspace() + { + return "keyspace"; + } + + @Override + public String tableName() { - return "keyspace.table"; + return "table"; } // Startup Validation diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java index 940f5a2..90d14fc 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java @@ -29,7 +29,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; +import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -37,9 +39,19 @@ import org.junit.jupiter.api.io.TempDir; import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.spark.bulkwriter.token.CassandraRing; +import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.common.model.CassandraInstance; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; import scala.Tuple2; +import static org.apache.cassandra.spark.bulkwriter.MockBulkWriterContext.DEFAULT_CASSANDRA_VERSION; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.VARCHAR; +import static org.apache.cassandra.spark.bulkwriter.TableSchemaTestCommon.mockCqlType; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.matchesPattern; @@ -84,21 +96,42 @@ public class RecordWriterTest validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); } + @Test + public void testWriteWithMixedCaseColumnNames() + { + boolean quoteIdentifiers = true; + String[] pk = {"ID", "date"}; + String[] columnNames = {"ID", "date", "course", "limit"}; + + Pair<StructType, ImmutableMap<String, CqlField.CqlType>> validPair = TableSchemaTestCommon.buildMatchedDataframeAndCqlColumns( + columnNames, + new DataType[]{DataTypes.IntegerType, DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType}, + new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); + + MockBulkWriterContext writerContext = new MockBulkWriterContext(ring, + DEFAULT_CASSANDRA_VERSION, + ConsistencyLevel.CL.LOCAL_QUORUM, + validPair, + pk, + pk, + quoteIdentifiers); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); + validateSuccessfulWrite(writerContext, data, columnNames); + } + @Test public void testWriteWithConstantTTL() { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, false); - validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); + validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); } @Test public void testWriteWithTTLColumn() { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, true, false); String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"}; - validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl); + validateSuccessfulWrite(writerContext, data, columnNamesWithTtl); } @Test diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java index 1c00d83..5cb184f 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Test; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.spark.common.schema.ColumnType; import org.apache.cassandra.spark.common.schema.ColumnTypes; import org.apache.cassandra.spark.data.CqlField; @@ -67,7 +68,7 @@ public class TableSchemaTest { TableSchema schema = getValidSchemaBuilder() .build(); - assertThat(schema.modificationStatement, + assertThat(trimUniqueTableName(schema.modificationStatement), is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks);"))); } @@ -75,14 +76,16 @@ public class TableSchemaTest public void testInsertStatementWithConstantTTL() { TableSchema schema = getValidSchemaBuilder().withTTLSetting(TTLOption.from("1000")).build(); - assertThat(schema.modificationStatement, is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL 1000;"))); + assertThat(trimUniqueTableName(schema.modificationStatement), + is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL 1000;"))); } @Test public void testInsertStatementWithTTLColumn() { TableSchema schema = getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).build(); - assertThat(schema.modificationStatement, is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL :ttl;"))); + assertThat(trimUniqueTableName(schema.modificationStatement), + is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL :ttl;"))); } @Test @@ -90,7 +93,7 @@ public class TableSchemaTest { TableSchema schema = getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("1000")).build(); String expectedQuery = "INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TIMESTAMP 1000;"; - assertThat(schema.modificationStatement, is(equalTo(expectedQuery))); + assertThat(trimUniqueTableName(schema.modificationStatement), is(equalTo(expectedQuery))); } @Test @@ -98,14 +101,14 @@ public class TableSchemaTest { TableSchema schema = getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("timestamp")).build(); String expectedQuery = "INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp;"; - assertThat(schema.modificationStatement, is(equalTo(expectedQuery))); + assertThat(trimUniqueTableName(schema.modificationStatement), is(equalTo(expectedQuery))); } @Test public void testInsertStatementWithTTLAndTimestampColumn() { TableSchema schema = getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).withTimeStampSetting(TimestampOption.from("timestamp")).build(); String expectedQuery = "INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp AND TTL :ttl;"; - assertThat(schema.modificationStatement, is(equalTo(expectedQuery))); + assertThat(trimUniqueTableName(schema.modificationStatement), is(equalTo(expectedQuery))); } @Test @@ -120,7 +123,7 @@ public class TableSchemaTest TableSchema schema = getValidSchemaBuilder() .withWriteMode(WriteMode.DELETE_PARTITION) .build(); - assertThat(schema.modificationStatement, is(equalTo("DELETE FROM test.test where id=?;"))); + assertThat(trimUniqueTableName(schema.modificationStatement), is(equalTo("DELETE FROM test.test where id=?;"))); } @Test @@ -222,7 +225,7 @@ public class TableSchemaTest private TableSchemaTestCommon.MockTableSchemaBuilder getValidSchemaBuilder() { - return new TableSchemaTestCommon.MockTableSchemaBuilder() + return new TableSchemaTestCommon.MockTableSchemaBuilder(CassandraBridgeFactory.get(cassandraVersion)) .withCqlColumns(validCqlColumns) .withPartitionKeyColumns(partitionKeyColumns) .withPrimaryKeyColumnNames(primaryKeyColumnNames) @@ -231,4 +234,10 @@ public class TableSchemaTest .withWriteMode(WriteMode.INSERT) .withDataFrameSchema(validDataFrameSchema); } + + // Removes the unique table name to make validation consistent + private static String trimUniqueTableName(String statement) + { + return statement.replaceAll("test.test_table_\\d+", "test.test"); + } } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java index 18ce2a6..ef937ce 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import com.google.common.base.Preconditions; @@ -30,6 +31,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.commons.lang3.tuple.Pair; +import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.spark.common.schema.ColumnType; import org.apache.cassandra.spark.data.CqlField; import org.apache.spark.sql.types.DataTypes; @@ -131,12 +134,13 @@ public final class TableSchemaTestCommon Pair<StructType, ImmutableMap<String, CqlField.CqlType>> pair = buildMatchedDataframeAndCqlColumns(fieldNames, sparkTypes, driverTypes); ImmutableMap<String, CqlField.CqlType> cqlColumns = pair.getValue(); StructType dataFrameSchema = pair.getKey(); + String cassandraVersion = "4.0.0"; return - new MockTableSchemaBuilder() + new MockTableSchemaBuilder(CassandraBridgeFactory.get(cassandraVersion)) .withCqlColumns(cqlColumns) .withPartitionKeyColumns(partitionKeyColumns) .withPrimaryKeyColumnNames(primaryKeyColumnNames) - .withCassandraVersion("3.0.24.8") + .withCassandraVersion(cassandraVersion) .withPartitionKeyColumnTypes(partitionKeyColumnTypes) .withWriteMode(WriteMode.INSERT) .withDataFrameSchema(dataFrameSchema) @@ -145,6 +149,7 @@ public final class TableSchemaTestCommon public static class MockTableSchemaBuilder { + private final CassandraBridge bridge; private ImmutableMap<String, CqlField.CqlType> cqlColumns; private String[] partitionKeyColumns; private String[] primaryKeyColumnNames; @@ -154,11 +159,17 @@ public final class TableSchemaTestCommon private WriteMode writeMode = null; private TTLOption ttlOption = TTLOption.forever(); private TimestampOption timestampOption = TimestampOption.now(); + private boolean quoteIdentifiers = false; + + public MockTableSchemaBuilder(CassandraBridge bridge) + { + this.bridge = bridge; + } public MockTableSchemaBuilder withCqlColumns(@NotNull Map<String, CqlField.CqlType> cqlColumns) { Preconditions.checkNotNull(cqlColumns, "cqlColumns cannot be null"); - Preconditions.checkArgument(cqlColumns.size() > 0, "cqlColumns cannot be empty"); + Preconditions.checkArgument(!cqlColumns.isEmpty(), "cqlColumns cannot be empty"); this.cqlColumns = ImmutableMap.copyOf(cqlColumns); return this; } @@ -182,7 +193,7 @@ public final class TableSchemaTestCommon public MockTableSchemaBuilder withCassandraVersion(@NotNull String cassandraVersion) { Preconditions.checkNotNull(cassandraVersion, "cassandraVersion cannot be null"); - Preconditions.checkArgument(cassandraVersion.length() > 0, "cassandraVersion cannot be an empty string"); + Preconditions.checkArgument(!cassandraVersion.isEmpty(), "cassandraVersion cannot be an empty string"); this.cassandraVersion = cassandraVersion; return this; } @@ -222,6 +233,12 @@ public final class TableSchemaTestCommon return this; } + public MockTableSchemaBuilder withQuotedIdentifiers() + { + this.quoteIdentifiers = true; + return this; + } + public TableSchema build() { Objects.requireNonNull(cqlColumns, @@ -238,11 +255,13 @@ public final class TableSchemaTestCommon "writeMode cannot be null. Please provide the write mode by calling #withWriteMode"); Objects.requireNonNull(dataFrameSchema, "dataFrameSchema cannot be null. Please provide the write mode by calling #withDataFrameSchema"); - MockTableInfoProvider tableInfoProvider = new MockTableInfoProvider(cqlColumns, + MockTableInfoProvider tableInfoProvider = new MockTableInfoProvider(bridge, + cqlColumns, partitionKeyColumns, partitionKeyColumnTypes, primaryKeyColumnNames, - cassandraVersion); + cassandraVersion, + quoteIdentifiers); if (ttlOption.withTTl() && ttlOption.columnName() != null) { dataFrameSchema = dataFrameSchema.add("ttl", DataTypes.IntegerType); @@ -251,31 +270,41 @@ public final class TableSchemaTestCommon { dataFrameSchema = dataFrameSchema.add("timestamp", DataTypes.IntegerType); } - return new TableSchema(dataFrameSchema, tableInfoProvider, writeMode, ttlOption, timestampOption); + return new TableSchema(dataFrameSchema, tableInfoProvider, writeMode, ttlOption, timestampOption, cassandraVersion, quoteIdentifiers); } } public static class MockTableInfoProvider implements TableInfoProvider { + public static final String TEST_TABLE_PREFIX = "test_table_"; + public static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0); + private final CassandraBridge bridge; private final ImmutableMap<String, CqlField.CqlType> cqlColumns; private final String[] partitionKeyColumns; private final ColumnType[] partitionKeyColumnTypes; private final String[] primaryKeyColumnNames; + private final String uniqueTableName; Map<String, CqlField.CqlType> columns; private final String cassandraVersion; + private final boolean quoteIdentifiers; - public MockTableInfoProvider(ImmutableMap<String, CqlField.CqlType> cqlColumns, + public MockTableInfoProvider(CassandraBridge bridge, + ImmutableMap<String, CqlField.CqlType> cqlColumns, String[] partitionKeyColumns, ColumnType[] partitionKeyColumnTypes, String[] primaryKeyColumnNames, - String cassandraVersion) + String cassandraVersion, + boolean quoteIdentifiers) { + this.bridge = bridge; this.cqlColumns = cqlColumns; this.partitionKeyColumns = partitionKeyColumns; this.partitionKeyColumnTypes = partitionKeyColumnTypes; this.primaryKeyColumnNames = primaryKeyColumnNames; columns = cqlColumns; this.cassandraVersion = cassandraVersion.replaceAll("(\\w+-)*cassandra-", ""); + this.quoteIdentifiers = quoteIdentifiers; + this.uniqueTableName = TEST_TABLE_PREFIX + TEST_TABLE_ID.getAndIncrement(); } @Override @@ -306,9 +335,9 @@ public final class TableSchemaTestCommon public String getCreateStatement() { String keyDef = getKeyDef(); - String createStatement = "CREATE TABLE test.test (" + cqlColumns.entrySet() + String createStatement = "CREATE TABLE test." + uniqueTableName + " (" + cqlColumns.entrySet() .stream() - .map(column -> column.getKey() + " " + column.getValue().name()) + .map(column -> maybeQuoteIdentifierIfRequested(column.getKey()) + " " + column.getValue().name()) .collect(Collectors.joining(",\n")) + ", " + keyDef + ") " + "WITH COMPRESSION = {'class': '" + getCompression() + "'};"; System.out.println("Create Table:" + createStatement); @@ -333,8 +362,12 @@ public final class TableSchemaTestCommon List<String> partitionColumns = Lists.newArrayList(partitionKeyColumns); List<String> primaryColumns = Lists.newArrayList(primaryKeyColumnNames); primaryColumns.removeAll(partitionColumns); - String partitionKey = "(" + String.join(",", partitionKeyColumns) + ")"; - String clusteringKey = String.join(",", primaryColumns); + String partitionKey = Arrays.stream(partitionKeyColumns) + .map(this::maybeQuoteIdentifierIfRequested) + .collect(Collectors.joining(",", "(", ")")); + String clusteringKey = primaryColumns.stream() + .map(this::maybeQuoteIdentifierIfRequested) + .collect(Collectors.joining(",")); return "PRIMARY KEY (" + partitionKey + clusteringKey + ")"; } @@ -347,7 +380,7 @@ public final class TableSchemaTestCommon @Override public String getName() { - return "test"; + return uniqueTableName; } @Override @@ -367,5 +400,12 @@ public final class TableSchemaTestCommon { return cqlColumns.keySet().asList(); } + + private String maybeQuoteIdentifierIfRequested(String identifier) + { + return quoteIdentifiers + ? bridge.maybeQuoteIdentifier(identifier) + : identifier; + } } } diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java index d346ef3..eb4beb1 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java @@ -39,7 +39,6 @@ import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.Session; import com.google.inject.Guice; import com.google.inject.Injector; @@ -49,6 +48,7 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; import io.vertx.junit5.VertxTestContext; import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; @@ -68,7 +68,7 @@ import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSAND public abstract class IntegrationTestBase { private static final int MAX_KEYSPACE_TABLE_WAIT_ATTEMPTS = 100; - private static final long MAX_KEYSPACE_TABLE_TIME = 100L; + private static final long MAX_KEYSPACE_TABLE_TIME = 200L; protected Logger logger = LoggerFactory.getLogger(this.getClass()); protected Vertx vertx; protected Server server; @@ -229,41 +229,37 @@ public abstract class IntegrationTestBase } /** - * Waits for the specified keyspace/table to be available. + * Waits for the specified keyspace to be available in Sidecar. * Empirically, this loop usually executes either zero or one time before completing. * However, we set a fairly high number of retries to account for variability in build machines. * - * @param keyspaceName the keyspace for which to wait - * @param tableName the table in the keyspace for which to wait + * @param keyspace the keyspace for which to wait */ - protected void waitForKeyspaceAndTable(String keyspaceName, String tableName) + protected void waitUntilSidecarPicksUpSchemaChange(String keyspace) { - int numInstances = sidecarTestContext.instancesConfig().instances().size(); int retries = MAX_KEYSPACE_TABLE_WAIT_ATTEMPTS; - boolean sidecarMissingSchema = true; - while (sidecarMissingSchema && retries-- > 0) + WebClient client = WebClient.create(vertx); + while (retries-- > 0) { - sidecarMissingSchema = false; - for (int i = 0; i < numInstances; i++) + try { - KeyspaceMetadata keyspace = sidecarTestContext.session(i) - .getCluster() - .getMetadata() - .getKeyspace(keyspaceName); - sidecarMissingSchema |= (keyspace == null || keyspace.getTable(tableName) == null); + client.get(server.actualPort(), "localhost", "/api/v1/keyspaces/" + keyspace + "/schema") + .expect(ResponsePredicate.SC_OK) + .send() + .toCompletionStage() + .toCompletableFuture() + .get(MAX_KEYSPACE_TABLE_TIME, TimeUnit.MILLISECONDS); + logger.info("Schema is ready in Sidecar"); + client.close(); + return; } - if (sidecarMissingSchema) + catch (Exception exception) { - logger.info("Keyspace/table {}/{} not yet available - waiting...", keyspaceName, tableName); + logger.info("Waiting for schema to propagate to Sidecar"); Uninterruptibles.sleepUninterruptibly(MAX_KEYSPACE_TABLE_TIME, TimeUnit.MILLISECONDS); } - else - { - return; - } } - throw new RuntimeException( - String.format("Keyspace/table %s/%s did not become visible on all sidecar instances", - keyspaceName, tableName)); + client.close(); + throw new RuntimeException(String.format("Keyspace %s did not become visible in Sidecar", keyspace)); } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java index 59007a1..0ee434d 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteTtlTest.java @@ -64,7 +64,7 @@ public class BulkWriteTtlTest extends IntegrationTestBase + " marks BIGINT\n" + " ) WITH default_time_to_live = 1;" ); - waitForKeyspaceAndTable(keyspace, table); + waitUntilSidecarPicksUpSchemaChange(keyspace); boolean addTTLColumn = false; boolean addTimestampColumn = false; IntegrationTestJob.builder((recordNum) -> generateCourse(recordNum, null, null), @@ -96,7 +96,7 @@ public class BulkWriteTtlTest extends IntegrationTestBase + " marks BIGINT\n" + " );" ); - waitForKeyspaceAndTable(keyspace, table); + waitUntilSidecarPicksUpSchemaChange(keyspace); Map<String, String> writerOptions = ImmutableMap.of(WriterOptions.TTL.name(), TTLOption.constant(1)); boolean addTTLColumn = false; boolean addTimestampColumn = false; @@ -129,7 +129,7 @@ public class BulkWriteTtlTest extends IntegrationTestBase + " marks BIGINT\n" + " );" ); - waitForKeyspaceAndTable(keyspace, table); + waitUntilSidecarPicksUpSchemaChange(keyspace); Map<String, String> writerOptions = ImmutableMap.of(WriterOptions.TTL.name(), TTLOption.perRow("ttl")); boolean addTTLColumn = true; boolean addTimestampColumn = false; diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java similarity index 80% rename from cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersTest.java rename to cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java index c2aa4be..0cd3444 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersReadTest.java @@ -22,17 +22,11 @@ package org.apache.cassandra.analytics; import java.util.Arrays; import java.util.Comparator; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.Uninterruptibles; import org.junit.jupiter.api.extension.ExtendWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.predicate.ResponsePredicate; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; import org.apache.cassandra.distributed.api.ConsistencyLevel; @@ -45,22 +39,22 @@ import static org.assertj.core.api.Assertions.assertThat; /** * Tests the bulk reader behavior when requiring quoted identifiers for keyspace, table name, and column names. - * These tests exercise a full integration test, which includes testing Sidecar behavior when dealing with quoted + * + * <p>These tests exercise a full integration test, which includes testing Sidecar behavior when dealing with quoted * identifiers. */ @ExtendWith(VertxExtension.class) -class QuoteIdentifiersTest extends SparkIntegrationTestBase +class QuoteIdentifiersReadTest extends SparkIntegrationTestBase { - private static final Logger LOGGER = LoggerFactory.getLogger(QuoteIdentifiersTest.class); - @CassandraIntegrationTest(nodesPerDc = 3, gossip = true) + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) void testMixedCaseKeyspace(VertxTestContext context) { QualifiedName qualifiedTableName = uniqueTestTableFullName("QuOtEd_KeYsPaCe"); runTestScenario(context, qualifiedTableName); } - @CassandraIntegrationTest(nodesPerDc = 3, gossip = true) + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) void testReservedWordKeyspace(VertxTestContext context) { // keyspace is a reserved word @@ -68,27 +62,27 @@ class QuoteIdentifiersTest extends SparkIntegrationTestBase runTestScenario(context, qualifiedTableName); } - @CassandraIntegrationTest(nodesPerDc = 3, gossip = true) + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) void testMixedCaseTable(VertxTestContext context) { QualifiedName qualifiedTableName = uniqueTestTableFullName(TEST_KEYSPACE, "QuOtEd_TaBlE"); runTestScenario(context, qualifiedTableName); } - @CassandraIntegrationTest(nodesPerDc = 3, gossip = true) + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) void testReservedWordTable(VertxTestContext context) { // table is a reserved word runTestScenario(context, new QualifiedName(TEST_KEYSPACE, "table")); } - @CassandraIntegrationTest(nodesPerDc = 3, gossip = true) + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) void testReadComplexSchema(VertxTestContext context) { QualifiedName tableName = uniqueTestTableFullName("QuOtEd_KeYsPaCe", "QuOtEd_TaBlE"); String quotedKeyspace = tableName.maybeQuotedKeyspace(); - createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 3)); + createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 1)); // Create UDT String createUdtQuery = "CREATE TYPE " + quotedKeyspace + ".\"UdT1\" (\"TimE\" bigint, \"limit\" int);"; @@ -129,7 +123,7 @@ class QuoteIdentifiersTest extends SparkIntegrationTestBase { String quotedKeyspace = tableName.maybeQuotedKeyspace(); - createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 3)); + createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 1)); createTestTable(String.format("CREATE TABLE IF NOT EXISTS %s (\"IdEnTiFiEr\" text, IdEnTiFiEr int, PRIMARY KEY(\"IdEnTiFiEr\"));", tableName)); List<String> dataset = Arrays.asList("a", "b", "c", "d", "e", "f", "g"); @@ -180,29 +174,4 @@ class QuoteIdentifiersTest extends SparkIntegrationTestBase .execute(query, ConsistencyLevel.ALL); } } - - void waitUntilSidecarPicksUpSchemaChange(String quotedKeyspace) - { - WebClient client = WebClient.create(vertx); - while (true) - { - try - { - client.get(server.actualPort(), "localhost", "/api/v1/keyspaces/" + quotedKeyspace + "/schema") - .expect(ResponsePredicate.SC_OK) - .send() - .toCompletionStage() - .toCompletableFuture() - .get(30, TimeUnit.SECONDS); - LOGGER.info("Schema is ready in Sidecar"); - break; - } - catch (Exception exception) - { - LOGGER.info("Waiting for schema to propagate to Sidecar"); - Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); - } - } - client.close(); - } } diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java new file mode 100644 index 0000000..63b947a --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/QuoteIdentifiersWriteTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.junit5.VertxExtension; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.BinaryType; +import static org.apache.spark.sql.types.DataTypes.LongType; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the bulk writer behavior when requiring quoted identifiers for keyspace, table name, and column names. + * + * <p>These tests exercise a full integration test, which includes testing Sidecar behavior when dealing with quoted + * identifiers. + */ +@ExtendWith(VertxExtension.class) +class QuoteIdentifiersWriteTest extends SparkIntegrationTestBase +{ + static final int ROW_COUNT = 10_000; + + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) + void testWriteWithMixedCaseKeyspaceName() + { + QualifiedName qualifiedTableName = uniqueTestTableFullName("QuOtEd_KeYsPaCe"); + runWriteTestScenario(qualifiedTableName); + } + + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) + void testWriteWithReservedWordKeyspaceName() + { + // keyspace is a reserved word + QualifiedName qualifiedTableName = uniqueTestTableFullName("keyspace"); + runWriteTestScenario(qualifiedTableName); + } + + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) + void testWriteWithMixedCaseTableName() + { + QualifiedName qualifiedTableName = uniqueTestTableFullName(TEST_KEYSPACE, "QuOtEd_TaBlE"); + runWriteTestScenario(qualifiedTableName); + } + + @CassandraIntegrationTest(nodesPerDc = 1, gossip = true) + void testWriteWithReservedWordTableName() + { + // table is a reserved word + runWriteTestScenario(new QualifiedName(TEST_KEYSPACE, "table")); + } + + void runWriteTestScenario(QualifiedName tableName) + { + String quotedKeyspace = tableName.maybeQuotedKeyspace(); + createTestKeyspace(quotedKeyspace, ImmutableMap.of("datacenter1", 1)); + createTestTable(String.format("CREATE TABLE IF NOT EXISTS %s (\"IdEnTiFiEr\" bigint, course blob, \"limit\" bigint, PRIMARY KEY(\"IdEnTiFiEr\"));", + tableName)); + waitUntilSidecarPicksUpSchemaChange(tableName.maybeQuotedKeyspace()); + + SparkSession spark = getOrCreateSparkSession(); + SparkContext sc = spark.sparkContext(); + JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sc); + SQLContext sql = spark.sqlContext(); + + int parallelism = sc.defaultParallelism(); + JavaRDD<Row> rows = genDataset(javaSparkContext, ROW_COUNT, parallelism); + Dataset<Row> df = sql.createDataFrame(rows, writeSchema()); + + bulkWriterDataFrameWriter(df, tableName).option("quote_identifiers", "true") + .save(); + validateWrites(tableName, rows); + } + + private void validateWrites(QualifiedName tableName, JavaRDD<Row> rowsWritten) + { + // build a set of entries read from Cassandra into a set + Set<String> actualEntries = Arrays.stream(sidecarTestContext.cassandraTestContext() + .cluster() + .coordinator(1) + .execute(String.format("SELECT * FROM %s;", tableName), ConsistencyLevel.LOCAL_QUORUM)) + .map((Object[] columns) -> String.format("%s:%s:%s", + new String(((ByteBuffer) columns[1]).array(), StandardCharsets.UTF_8), + columns[0], + columns[2])) + .collect(Collectors.toSet()); + + // Number of entries in Cassandra must match the original datasource + assertThat(actualEntries.size()).isEqualTo(rowsWritten.count()); + + // remove from actual entries to make sure that the data read is the same as the data written + rowsWritten.collect() + .forEach(row -> { + String key = String.format("%s:%d:%d", + new String((byte[]) row.get(0), StandardCharsets.UTF_8), + row.getLong(1), + row.getLong(2)); + assertThat(actualEntries.remove(key)).as(key + " is expected to exist in the actual entries") + .isTrue(); + }); + + // If this fails, it means there was more data in the database than we expected + assertThat(actualEntries).as("All entries are expected to be read from database") + .isEmpty(); + } + + static StructType writeSchema() + { + return new StructType() + .add("course", BinaryType, false) + .add("IdEnTiFiEr", LongType, false) // case-sensitive struct + .add("limit", LongType, false); // limit is a reserved word in Cassandra + } + + private static JavaRDD<Row> genDataset(JavaSparkContext sc, int recordCount, Integer parallelism) + { + long recordsPerPartition = recordCount / parallelism; + long remainder = recordCount - (recordsPerPartition * parallelism); + List<Integer> seq = IntStream.range(0, parallelism).boxed().collect(Collectors.toList()); + return sc.parallelize(seq, parallelism).mapPartitionsWithIndex( + (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, integerIterator) -> { + long firstRecordNumber = index * recordsPerPartition; + long recordsToGenerate = index.equals(parallelism) ? remainder : recordsPerPartition; + return LongStream.range(0, recordsToGenerate).mapToObj(offset -> { + long limit = firstRecordNumber + offset; + return RowFactory.create(("course-" + limit).getBytes(), limit, limit); + }).iterator(); + }, false); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkBulkWriterSimpleTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkBulkWriterSimpleTest.java index 916b243..cd16930 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkBulkWriterSimpleTest.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkBulkWriterSimpleTest.java @@ -57,7 +57,7 @@ public class SparkBulkWriterSimpleTest extends IntegrationTestBase + " course BLOB,\n" + " marks BIGINT\n" + " );"); - waitForKeyspaceAndTable(keyspace, table); + waitUntilSidecarPicksUpSchemaChange(keyspace); Map<String, String> writerOptions = new HashMap<>(); // A constant timestamp and TTL can be used by adding the following options to the writerOptions map // writerOptions.put(WriterOptions.TTL.name(), TTLOption.constant(20)); diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java index 7d09844..a33bc78 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkIntegrationTestBase.java @@ -20,6 +20,7 @@ package org.apache.cassandra.analytics; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.cassandra.sidecar.testing.IntegrationTestBase; import org.apache.cassandra.sidecar.testing.QualifiedName; @@ -28,6 +29,9 @@ import org.apache.cassandra.spark.bulkwriter.BulkSparkConf; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; @@ -52,7 +56,7 @@ public class SparkIntegrationTestBase extends IntegrationTestBase int numCores = coresPerExecutor * numExecutors; return sql.read().format("org.apache.cassandra.spark.sparksql.CassandraDataSource") - .option("sidecar_instances", "localhost,localhost2,localhost3") + .option("sidecar_instances", "localhost") .option("keyspace", tableName.keyspace()) // unquoted .option("table", tableName.table()) // unquoted .option("DC", "datacenter1") @@ -64,12 +68,27 @@ public class SparkIntegrationTestBase extends IntegrationTestBase .option("sidecar_port", server.actualPort()); } + protected DataFrameWriter<Row> bulkWriterDataFrameWriter(Dataset<Row> df, QualifiedName tableName) + { + String sidecarInstances = sidecarTestContext.instancesConfig().instances().stream().map(f -> f.host()).collect(Collectors.joining(",")); + return df.write() + .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") + .option("sidecar_instances", sidecarInstances) + .option("keyspace", tableName.keyspace()) + .option("table", tableName.table()) + .option("local_dc", "datacenter1") + .option("bulk_writer_cl", "LOCAL_QUORUM") + .option("number_splits", "-1") + .option("sidecar_port", server.actualPort()) + .mode("append"); + } + protected SparkConf getOrCreateSparkConf() { if (sparkConf == null) { sparkConf = new SparkConf() - .setAppName("Integration test Spark Cassandra Bulk Reader Job") + .setAppName("Integration test Spark Cassandra Bulk Analytics Job") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Spark is not case-sensitive by default, but we want to make it case-sensitive for // the quoted identifiers tests where we test mixed case @@ -87,7 +106,7 @@ public class SparkIntegrationTestBase extends IntegrationTestBase { sparkSession = SparkSession .builder() - .config(sparkConf) + .config(getOrCreateSparkConf()) .getOrCreate(); } return sparkSession; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org