This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 9523a38 CASSANDRA-18605 Adding support for TTL & Timestamps for bulk writes 9523a38 is described below commit 9523a38b3f1b5bc4313e2949896ddc1fff58afbe Author: jkonisa <jkon...@apple.com> AuthorDate: Thu Jun 15 13:31:01 2023 -0700 CASSANDRA-18605 Adding support for TTL & Timestamps for bulk writes This commit introduces a new feature in Spark Bulk Writer to support writes with constant/per_row based TTL & Timestamps. Patch by Jyothsna Konisa; Reviewed by Dinesh Joshi, Francisco Guerrero, Yifan Cai for CASSANDRA-18605 --- CHANGES.txt | 3 +- .../spark/example/SampleCassandraJob.java | 54 +++++++-- .../cassandra/spark/bulkwriter/BulkSparkConf.java | 14 +++ .../bulkwriter/CassandraBulkSourceRelation.java | 26 ++--- .../bulkwriter/CassandraBulkWriterContext.java | 2 +- .../cassandra/spark/bulkwriter/RecordWriter.java | 27 ++++- .../cassandra/spark/bulkwriter/SSTableWriter.java | 5 +- .../spark/bulkwriter/SqlToCqlTypeConverter.java | 10 ++ .../cassandra/spark/bulkwriter/TTLOption.java | 127 +++++++++++++++++++++ .../cassandra/spark/bulkwriter/TableSchema.java | 83 +++++++++++--- .../spark/bulkwriter/TimestampOption.java | 125 ++++++++++++++++++++ .../cassandra/spark/bulkwriter/WriterOptions.java | 4 +- .../spark/bulkwriter/MockBulkWriterContext.java | 2 + .../spark/bulkwriter/MockTableWriter.java | 5 +- .../spark/bulkwriter/RecordWriterTest.java | 103 ++++++++++++++--- .../spark/bulkwriter/SSTableWriterTest.java | 3 +- .../bulkwriter/StreamSessionConsistencyTest.java | 8 +- .../spark/bulkwriter/StreamSessionTest.java | 24 ++-- .../spark/bulkwriter/TableSchemaTest.java | 39 ++++++- .../spark/bulkwriter/TableSchemaTestCommon.java | 25 +++- .../org/apache/cassandra/bridge/SSTableWriter.java | 3 +- .../bridge/SSTableWriterImplementation.java | 3 +- 22 files changed, 600 insertions(+), 95 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 3632517..eeaee1c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Add support for TTL & Timestamps for bulk writes (CASSANDRA-18605) * Add circleci configuration yaml for Cassandra Analytics (CASSANDRA-18578) * Provide a SecretsProvider interface to abstract the secret provisioning (CASSANDRA-18545) - * Add the .asf.yaml file (CASSANDRA-18548) \ No newline at end of file + * Add the .asf.yaml file (CASSANDRA-18548) diff --git a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java index 4e21749..58c9827 100644 --- a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java +++ b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java @@ -28,6 +28,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.LongStream; +import org.apache.cassandra.spark.bulkwriter.TTLOption; +import org.apache.cassandra.spark.bulkwriter.TimestampOption; +import org.apache.cassandra.spark.bulkwriter.WriterOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +49,7 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.StructType; import static org.apache.spark.sql.types.DataTypes.BinaryType; +import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.LongType; /** @@ -122,15 +126,12 @@ public final class SampleCassandraJob private static Dataset<Row> write(long rowCount, SparkConf sparkConf, SQLContext sql, SparkContext sc) { - StructType schema = new StructType() - .add("id", LongType, false) - .add("course", BinaryType, false) - .add("marks", LongType, false); - JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sc); int parallelism = sc.defaultParallelism(); - JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount, parallelism); - Dataset<Row> df = sql.createDataFrame(rows, schema); + boolean addTTLColumn = true; + boolean addTimestampColumn = true; + JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount, parallelism, addTTLColumn, addTimestampColumn); + Dataset<Row> df = sql.createDataFrame(rows, getWriteSchema(addTTLColumn, addTimestampColumn)); df.write() .format("org.apache.cassandra.spark.sparksql.CassandraDataSink") @@ -140,6 +141,11 @@ public final class SampleCassandraJob .option("local_dc", "datacenter1") .option("bulk_writer_cl", "LOCAL_QUORUM") .option("number_splits", "-1") + // A constant timestamp and TTL can be used by setting the following options. + // .option(WriterOptions.TIMESTAMP.name(), TimestampOption.constant(System.currentTimeMillis() * 1000)) + // .option(WriterOptions.TTL.name(), TTLOption.constant(20)) + .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl")) + .option(WriterOptions.TIMESTAMP.name(), TimestampOption.perRow("timestamp")) .mode("append") .save(); return df; @@ -174,6 +180,23 @@ public final class SampleCassandraJob return df; } + private static StructType getWriteSchema(boolean addTTLColumn, boolean addTimestampColumn) + { + StructType schema = new StructType() + .add("id", LongType, false) + .add("course", BinaryType, false) + .add("marks", LongType, false); + if (addTTLColumn) + { + schema = schema.add("ttl", IntegerType, false); + } + if (addTimestampColumn) + { + schema = schema.add("timestamp", LongType, false); + } + return schema; + } + private static void checkSmallDataFrameEquality(Dataset<Row> expected, Dataset<Row> actual) { if (actual == null) @@ -186,11 +209,14 @@ public final class SampleCassandraJob } } - private static JavaRDD<Row> genDataset(JavaSparkContext sc, long records, Integer parallelism) + private static JavaRDD<Row> genDataset(JavaSparkContext sc, long records, Integer parallelism, + boolean addTTLColumn, boolean addTimestampColumn) { long recordsPerPartition = records / parallelism; long remainder = records - (recordsPerPartition * parallelism); List<Integer> seq = IntStream.range(0, parallelism).boxed().collect(Collectors.toList()); + int ttl = 10; + long timeStamp = System.currentTimeMillis() * 1000; JavaRDD<Row> dataset = sc.parallelize(seq, parallelism).mapPartitionsWithIndex( (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, integerIterator) -> { long firstRecordNumber = index * recordsPerPartition; @@ -201,6 +227,18 @@ public final class SampleCassandraJob Integer courseNameStringLen = courseNameString.length(); Integer courseNameMultiplier = 1000 / courseNameStringLen; byte[] courseName = dupStringAsBytes(courseNameString, courseNameMultiplier); + if (addTTLColumn && addTimestampColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, ttl, timeStamp); + } + if (addTTLColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, ttl); + } + if (addTimestampColumn) + { + return RowFactory.create(recordNumber, courseName, recordNumber, timeStamp); + } return RowFactory.create(recordNumber, courseName, recordNumber); }).iterator(); return rows; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index 292282f..ee63df9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -119,6 +119,8 @@ public class BulkSparkConf implements Serializable protected final String truststorePath; protected final String truststoreBase64Encoded; protected final String truststoreType; + protected final String ttl; + protected final String timestamp; protected final SparkConf conf; public final boolean validateSSTables; public final int commitThreadsPerInstance; @@ -157,6 +159,8 @@ public class BulkSparkConf implements Serializable // else fall back to props, and then default if neither specified this.useOpenSsl = getBoolean(USE_OPENSSL, true); this.ringRetryCount = getInt(RING_RETRY_COUNT, DEFAULT_RING_RETRY_COUNT); + this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), null); + this.timestamp = MapUtils.getOrDefault(options, WriterOptions.TIMESTAMP.name(), null); validateEnvironment(); } @@ -241,6 +245,16 @@ public class BulkSparkConf implements Serializable return truststorePath; } + protected TTLOption getTTLOptions() + { + return TTLOption.from(ttl); + } + + protected TimestampOption getTimestampOptions() + { + return TimestampOption.from(timestamp); + } + protected String getTruststoreBase64Encoded() { return truststoreBase64Encoded; diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java index 15d4371..8f2c0b5 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java @@ -19,8 +19,9 @@ package org.apache.cassandra.spark.bulkwriter; -import java.io.Serializable; +import java.util.Iterator; +import org.apache.spark.api.java.function.VoidFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +103,7 @@ public class CassandraBulkSourceRelation extends BaseRelation implements Inserta .map(tableSchema::normalize) .keyBy(tokenizer::getDecoratedKey) .repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner()); - persist(sortedRDD); + persist(sortedRDD, data.columns()); } private void cancelJob(@NotNull CancelJobEvent cancelJobEvent) @@ -121,7 +122,7 @@ public class CassandraBulkSourceRelation extends BaseRelation implements Inserta } @SuppressWarnings("RedundantCast") - private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD) + private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> sortedRDD, String[] columnNames) { writeValidator.setPhase("Environment Validation"); writeValidator.validateInitialEnvironment(); @@ -129,7 +130,7 @@ public class CassandraBulkSourceRelation extends BaseRelation implements Inserta try { - sortedRDD.foreachPartition(new WriteIterator(broadcastContext)::call); + sortedRDD.foreachPartition(writeRowsInPartition(broadcastContext, columnNames)); writeValidator.failIfRingChanged(); } catch (Throwable throwable) @@ -168,18 +169,11 @@ public class CassandraBulkSourceRelation extends BaseRelation implements Inserta } } - private static class WriteIterator implements Serializable + // Made this function static to avoid capturing reference to CassandraBulkSourceRelation object which cannot be + // serialized. + private static VoidFunction<Iterator<Tuple2<DecoratedKey, Object[]>>> writeRowsInPartition(Broadcast<BulkWriterContext> broadcastContext, + String[] columnNames) { - private final Broadcast<BulkWriterContext> broadcastContext; - - WriteIterator(Broadcast<BulkWriterContext> broadcastContext) - { - this.broadcastContext = broadcastContext; - } - - public void call(java.util.Iterator<Tuple2<DecoratedKey, Object[]>> iterator) - { - new RecordWriter(broadcastContext.getValue()).write(iterator); - } + return itr -> new RecordWriter(broadcastContext.getValue(), columnNames).write(itr); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java index 4813b1c..2709b51 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java @@ -91,7 +91,7 @@ public class CassandraBulkWriterContext implements BulkWriterContext, KryoSerial CqlTable cqlTable = bridge.buildSchema(tableSchema, keyspace, replicationFactor, partitioner, udts, null, indexCount); TableInfoProvider tableInfoProvider = new CqlTableInfoProvider(tableSchema, cqlTable); - schemaInfo = new CassandraSchemaInfo(new TableSchema(dfSchema, tableInfoProvider, conf.writeMode)); + schemaInfo = new CassandraSchemaInfo(new TableSchema(dfSchema, tableInfoProvider, conf.writeMode, conf.getTTLOptions(), conf.getTimestampOptions())); } public static BulkWriterContext fromOptions(@NotNull SparkContext sparkContext, diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index a5aaf8d..32b8632 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -27,8 +27,10 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.time.Instant; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.function.BiFunction; import java.util.function.Supplier; @@ -52,23 +54,26 @@ public class RecordWriter implements Serializable private static final Logger LOGGER = LoggerFactory.getLogger(RecordWriter.class); private final BulkWriterContext writerContext; + private final String[] columnNames; private Supplier<TaskContext> taskContextSupplier; private final BiFunction<BulkWriterContext, Path, SSTableWriter> tableWriterSupplier; private SSTableWriter sstableWriter = null; private int batchNumber = 0; private int batchSize = 0; - public RecordWriter(BulkWriterContext writerContext) + public RecordWriter(BulkWriterContext writerContext, String[] columnNames) { - this(writerContext, TaskContext::get, SSTableWriter::new); + this(writerContext, columnNames, TaskContext::get, SSTableWriter::new); } @VisibleForTesting RecordWriter(BulkWriterContext writerContext, + String[] columnNames, Supplier<TaskContext> taskContextSupplier, BiFunction<BulkWriterContext, Path, SSTableWriter> tableWriterSupplier) { this.writerContext = writerContext; + this.columnNames = columnNames; this.taskContextSupplier = taskContextSupplier; this.tableWriterSupplier = tableWriterSupplier; } @@ -99,12 +104,13 @@ public class RecordWriter implements Serializable Integer.toString(taskContext.stageAttemptNumber()), Integer.toString(taskContext.attemptNumber()), Integer.toString(partitionId)); + Map<String, Object> valueMap = new HashMap<>(); try { while (dataIterator.hasNext()) { maybeCreateTableWriter(partitionId, baseDir); - writeRow(dataIterator, partitionId, range); + writeRow(valueMap, dataIterator, partitionId, range); checkBatchSize(streamSession, partitionId, job); } @@ -138,7 +144,8 @@ public class RecordWriter implements Serializable } } - public void writeRow(scala.collection.Iterator<Tuple2<DecoratedKey, Object[]>> dataIterator, + public void writeRow(Map<String, Object> valueMap, + scala.collection.Iterator<Tuple2<DecoratedKey, Object[]>> dataIterator, int partitionId, Range<BigInteger> range) throws IOException { @@ -149,7 +156,7 @@ public class RecordWriter implements Serializable String.format("Received Token %s outside of expected range %s", token, range)); try { - sstableWriter.addRow(token, tuple._2()); + sstableWriter.addRow(token, getBindValuesForColumns(valueMap, columnNames, tuple._2())); } catch (RuntimeException exception) { @@ -186,6 +193,16 @@ public class RecordWriter implements Serializable } } + private static Map<String, Object> getBindValuesForColumns(Map<String, Object> map, String[] columnNames, Object[] values) + { + assert values.length == columnNames.length : "Number of values does not match the number of columns " + values.length + ", " + columnNames.length; + for (int i = 0; i < columnNames.length; i++) + { + map.put(columnNames[i], values[i]); + } + return map; + } + private void finalizeSSTable(StreamSession streamSession, int partitionId, SSTableWriter sstableWriter, diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java index bf982ef..df6f155 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java @@ -91,15 +91,14 @@ public class SSTableWriter return CASSANDRA_VERSION_PREFIX + lowestCassandraVersion; } - public void addRow(BigInteger token, Object[] values) throws IOException + public void addRow(BigInteger token, Map<String, Object> boundValues) throws IOException { if (minToken == null) { minToken = token; } maxToken = token; - - cqlSSTableWriter.addRow(values); + cqlSSTableWriter.addRow(boundValues); } public void close(BulkWriterContext writerContext, int partitionId) throws IOException diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java index 03772bd..5749a07 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java @@ -171,6 +171,16 @@ public final class SqlToCqlTypeConverter implements Serializable } } + public static Converter<?> getIntegerConverter() + { + return INTEGER_CONVERTER; + } + + public static Converter<?> getLongConverter() + { + return LONG_CONVERTER; + } + private static Converter<?> determineCustomConvert(CqlField.CqlCustom customType) { Preconditions.checkArgument(customType.name().equalsIgnoreCase(CUSTOM), "Non-custom types are not supported"); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java new file mode 100644 index 0000000..21a07be --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter; + + +import java.io.Serializable; +import java.time.Duration; + +public final class TTLOption implements Serializable +{ + private static final TTLOption FOREVER = new TTLOption(0); + private final String ttlColumnName; + private final Integer ttlInSeconds; + + private TTLOption(String ttlColumnName) + { + this.ttlColumnName = ttlColumnName; + this.ttlInSeconds = null; + } + + private TTLOption(int ttlInSeconds) + { + this.ttlInSeconds = ttlInSeconds; + this.ttlColumnName = null; + } + + public static TTLOption from(String ttl) + { + if (ttl == null) + { + return FOREVER; + } + try + { + return new TTLOption(Integer.parseInt(ttl)); + } + catch (Exception e) + { + + return new TTLOption(ttl); + } + } + + /** + * TTL option for write with a constant TTL. When same values for TTL should be used for all rows in a bulk write + * call use this option. + * + * @param ttlInSeconds ttl value in seconds + * @return TTLOption + */ + public static String constant(int ttlInSeconds) + { + return String.valueOf(ttlInSeconds); + } + + /** + * TTL option for write with a constant TTL. When same values for TTL should be used for all rows in a bulk write + * call use this option. + * + * @param duration ttl value in Duration + * @return TTLOption + */ + public static String constant(Duration duration) + { + return String.valueOf(duration.getSeconds()); + } + + /** + * TTL option for writes with TTL per Row. When different TTL has to be used for different rows in a bulk write + * call use this option. It expects the input RDD to supply the TTL values as an additional column at each row of + * the RDD. The TTL value provider column is selected by {@code ttlColumnName} + * + * @param ttlColumnName column name which has TTL values for each row + * @return TTLOption + */ + public static String perRow(String ttlColumnName) + { + return ttlColumnName; + } + + public static TTLOption forever() + { + return FOREVER; + } + + public String columnName() + { + return ttlColumnName; + } + + public boolean withTTl() + { + return !this.equals(FOREVER) + && (ttlColumnName != null || ttlInSeconds != null); + } + + @Override + public String toString() + { + if (ttlColumnName != null && !ttlColumnName.isEmpty()) + { + return ":" + ttlColumnName; + } + if (ttlInSeconds != null) + { + return Integer.toString(ttlInSeconds); + } + return null; + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java index 6e5b0ae..fa3371f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.base.Preconditions; +import org.apache.cassandra.spark.data.CqlField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,10 +47,15 @@ public class TableSchema implements Serializable final List<SqlToCqlTypeConverter.Converter<?>> converters; private final List<Integer> keyFieldPositions; private final WriteMode writeMode; + private final TTLOption ttlOption; + private final TimestampOption timestampOption; - public TableSchema(StructType dfSchema, TableInfoProvider tableInfo, WriteMode writeMode) + public TableSchema(StructType dfSchema, TableInfoProvider tableInfo, WriteMode writeMode, + TTLOption ttlOption, TimestampOption timestampOption) { this.writeMode = writeMode; + this.ttlOption = ttlOption; + this.timestampOption = timestampOption; validateDataFrameCompatibility(dfSchema, tableInfo); validateNoSecondaryIndexes(tableInfo); @@ -58,7 +64,7 @@ public class TableSchema implements Serializable this.modificationStatement = getModificationStatement(dfSchema, tableInfo); this.partitionKeyColumns = getPartitionKeyColumnNames(tableInfo); this.partitionKeyColumnTypes = getPartitionKeyColumnTypes(tableInfo); - this.converters = getConverters(dfSchema, tableInfo); + this.converters = getConverters(dfSchema, tableInfo, ttlOption, timestampOption); LOGGER.info("Converters: {}", converters); this.keyFieldPositions = getKeyFieldPositions(dfSchema, tableInfo.getColumnNames(), getRequiredKeyColumns(tableInfo)); } @@ -100,12 +106,24 @@ public class TableSchema implements Serializable } private static List<SqlToCqlTypeConverter.Converter<?>> getConverters(StructType dfSchema, - TableInfoProvider tableInfo) + TableInfoProvider tableInfo, + TTLOption ttlOption, + TimestampOption timestampOption) { return Arrays.stream(dfSchema.fieldNames()) - .map(tableInfo::getColumnType) - .map(SqlToCqlTypeConverter::getConverter) - .collect(Collectors.toList()); + .map(fieldName -> { + if (fieldName.equals(ttlOption.columnName())) + { + return SqlToCqlTypeConverter.getIntegerConverter(); + } + if (fieldName.equals(timestampOption.columnName())) + { + return SqlToCqlTypeConverter.getLongConverter(); + } + CqlField.CqlType cqlType = tableInfo.getColumnType(fieldName); + return SqlToCqlTypeConverter.getConverter(cqlType); + }) + .collect(Collectors.toList()); } private static List<ColumnType<?>> getPartitionKeyColumnTypes(TableInfoProvider tableInfo) @@ -130,7 +148,7 @@ public class TableSchema implements Serializable switch (writeMode) { case INSERT: - return getInsertStatement(dfSchema, tableInfo); + return getInsertStatement(dfSchema, tableInfo, ttlOption, timestampOption); case DELETE_PARTITION: return getDeleteStatement(dfSchema, tableInfo); default: @@ -138,15 +156,38 @@ public class TableSchema implements Serializable } } - private static String getInsertStatement(StructType dfSchema, TableInfoProvider tableInfo) + private static String getInsertStatement(StructType dfSchema, TableInfoProvider tableInfo, + TTLOption ttlOption, TimestampOption timestampOption) { - String insertStatement = String.format("INSERT INTO %s.%s (%s) VALUES (%s);", - tableInfo.getKeyspaceName(), - tableInfo.getName(), - String.join(",", dfSchema.fieldNames()), - Arrays.stream(dfSchema.fieldNames()) - .map(field -> "?") - .collect(Collectors.joining(","))); + List<String> columnNames = Arrays.stream(dfSchema.fieldNames()) + .filter(fieldName -> !fieldName.equals(ttlOption.columnName())) + .filter(fieldName -> !fieldName.equals(timestampOption.columnName())) + .collect(Collectors.toList()); + StringBuilder stringBuilder = new StringBuilder("INSERT INTO ") + .append(tableInfo.getKeyspaceName()) + .append(".").append(tableInfo.getName()) + .append(columnNames.stream().collect(Collectors.joining(",", " (", ") "))) + .append("VALUES") + .append(columnNames.stream().map(columnName -> ":" + columnName).collect(Collectors.joining(",", " (", ")"))); + if (ttlOption.withTTl() && timestampOption.withTimestamp()) + { + stringBuilder.append(" USING TIMESTAMP ") + .append(timestampOption) + .append(" AND TTL ") + .append(ttlOption); + } + else if (timestampOption.withTimestamp()) + { + stringBuilder.append(" USING TIMESTAMP ") + .append(timestampOption); + } + else if (ttlOption.withTTl()) + { + stringBuilder.append(" USING TTL ") + .append(ttlOption); + } + stringBuilder.append(";"); + String insertStatement = stringBuilder.toString(); LOGGER.info("CQL insert statement for the RDD {}", insertStatement); return insertStatement; @@ -174,7 +215,7 @@ public class TableSchema implements Serializable switch (writeMode) { case INSERT: - validateDataframeFieldsInTable(tableInfo, dfFields); + validateDataframeFieldsInTable(tableInfo, dfFields, ttlOption, timestampOption); return; case DELETE_PARTITION: validateOnlyPartitionKeyColumnsInDataframe(tableInfo, dfFields); @@ -204,12 +245,16 @@ public class TableSchema implements Serializable .collect(Collectors.joining(","))); } - private static void validateDataframeFieldsInTable(TableInfoProvider tableInfo, Set<String> dfFields) + private static void validateDataframeFieldsInTable(TableInfoProvider tableInfo, Set<String> dfFields, + TTLOption ttlOption, TimestampOption timestampOption) { // Make sure all fields in DF schema are part of table - String unknownFields = dfFields.stream() + List<String> unknownFields = dfFields.stream() .filter(columnName -> !tableInfo.columnExists(columnName)) - .collect(Collectors.joining(",")); + .filter(columnName -> !columnName.equals(ttlOption.columnName())) + .filter(columnName -> !columnName.equals(timestampOption.columnName())) + .collect(Collectors.toList()); + Preconditions.checkArgument(unknownFields.isEmpty(), "Unknown fields in data frame => " + unknownFields); } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java new file mode 100644 index 0000000..06307b9 --- /dev/null +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.bulkwriter; + +import java.io.Serializable; +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +public final class TimestampOption implements Serializable +{ + private static final TimestampOption NOW = new TimestampOption(System.currentTimeMillis() * 1000); + private String timestampColumnName; + private Long timeStampInMicroSeconds; + + private TimestampOption(String timestampColumnName) + { + this.timestampColumnName = timestampColumnName; + } + + private TimestampOption(Long timeStampInMicroSeconds) + { + this.timeStampInMicroSeconds = timeStampInMicroSeconds; + } + + public static TimestampOption from(String timestamp) + { + if (timestamp == null) + { + return NOW; + } + try + { + return new TimestampOption(Long.parseLong(timestamp)); + } + catch (Exception e) + { + + return new TimestampOption(timestamp); + } + } + + /** + * Timestamp option for write with a constant timestamp. When same values for timestamp should be used for all rows in + * a bulk write call use this option. + * + * @param timestampInMicroSeconds timestamp value in microseconds + * @return timestamp option + */ + public static String constant(long timestampInMicroSeconds) + { + return String.valueOf(timestampInMicroSeconds); + } + + /** + * Timestamp option for write with a constant timestamp. When same values for timestamp should be used for all rows in + * a bulk write call use this option. + * + * @param duration timestamp value in Duration + * @return timestamp option + */ + public static String constant(Duration duration) + { + return String.valueOf(duration.get(ChronoUnit.MICROS)); + } + + /** + * Timestamp option for writes with timestamp per Row. When different timestamp has to be used for different rows in + * a bulk write call use this option. It expects the input RDD to supply the timestamp values as an additional + * column at each row of the RDD. The timestamp value provider column is selected by {@code timeStampColumnName} + * + * @param timeStampColumnName column name which has timestamp values for each row + * @return timestamp option + */ + public static String perRow(String timeStampColumnName) + { + return timeStampColumnName; + } + + public static TimestampOption now() + { + return NOW; + } + + public String columnName() + { + return timestampColumnName; + } + + public boolean withTimestamp() + { + return !this.equals(NOW) + && (timestampColumnName != null || timeStampInMicroSeconds != null); + } + + @Override + public String toString() + { + if (timestampColumnName != null && !timestampColumnName.isEmpty()) + { + return ":" + timestampColumnName; + } + if (timeStampInMicroSeconds != null) + { + return Long.toString(timeStampInMicroSeconds); + } + return null; + } +} diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java index 7b48df8..d824f05 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java @@ -43,5 +43,7 @@ public enum WriterOptions implements WriterOption TRUSTSTORE_BASE64_ENCODED, SIDECAR_PORT, ROW_BUFFER_MODE, - SSTABLE_DATA_SIZE_IN_MB + SSTABLE_DATA_SIZE_IN_MB, + TTL, + TIMESTAMP } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index 262b7df..4cf29ca 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -85,6 +85,7 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo private CommitResultSupplier crSupplier = (uuids, dc) -> new RemoteCommitResult(true, Collections.emptyList(), uuids, null); private Predicate<CassandraInstance> uploadRequestConsumer = instance -> true; + private TTLOption ttlOption = TTLOption.forever(); public MockBulkWriterContext(CassandraRing<RingInstance> ring, String cassandraVersion, @@ -111,6 +112,7 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo .withPartitionKeyColumnTypes(partitionKeyColumnTypes) .withWriteMode(WriteMode.INSERT) .withDataFrameSchema(validDataFrameSchema) + .withTTLSetting(ttlOption) .build(); this.jobId = java.util.UUID.randomUUID(); } diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java index 4003f54..716b7ee 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java @@ -24,6 +24,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Map; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.FileUtils; @@ -64,13 +65,13 @@ public class MockTableWriter implements SSTableWriter } @Override - public void addRow(Object... values) + public void addRow(Map<String, Object> values) throws IOException { if (addRowThrows) { throw new RuntimeException("Failed to write because addRow throws"); } - rows.add(values); + rows.add(values.values().toArray()); } @Override diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java index 4e60ec6..1ccdc72 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java @@ -48,6 +48,7 @@ public class RecordWriterTest public static final int REPLICA_COUNT = 3; public static final int FILES_PER_SSTABLE = 8; public static final int UPLOADED_TABLES = 3; + private static final String[] COLUMN_NAMES = {"id", "date", "course", "marks"}; @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -75,17 +76,51 @@ public class RecordWriterTest @Test public void testSuccessfulWrite() { - rw = new RecordWriter(writerContext, () -> tc, SSTableWriter::new); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); - rw.write(data); - Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); - assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas - assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES)); - List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); - for (UploadRequest ur: requests) - { - assertNotNull(ur.fileHash); - } + validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); + } + + @Test + public void testWriteWithConstantTTL() + { + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, false); + validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); + } + + @Test + public void testWriteWithTTLColumn() + { + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, true, false); + String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"}; + validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl); + } + + @Test + public void testWriteWithConstantTimestamp() + { + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, false); + validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); + } + + @Test + public void testWriteWithTimestampColumn() + { + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, false, true); + String[] columnNamesWithTimestamp = {"id", "date", "course", "marks", "timestamp"}; + validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTimestamp); + } + + @Test + public void testWriteWithTimestampAndTTLColumn() + { + MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(ring); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, true, true); + String[] columnNames = {"id", "date", "course", "marks", "ttl", "timestamp"}; + validateSuccessfulWrite(bulkWriterContext, data, columnNames); } @Test @@ -93,7 +128,7 @@ public class RecordWriterTest { // TODO: Add better error handling with human-readable exception messages in SSTableReader::new exception.expect(RuntimeException.class); - rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new SSTableWriter(tw.setOutDir(path), path)); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw.setOutDir(path), path)); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); rw.write(data); Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); @@ -104,7 +139,7 @@ public class RecordWriterTest @Test public void testWriteWithOutOfRangeTokenFails() { - rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new SSTableWriter(tw, folder.getRoot().toPath())); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder.getRoot().toPath())); exception.expectMessage("Received Token 5765203080415074583 outside of expected range [-9223372036854775808‥0]"); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, false); rw.write(data); @@ -113,7 +148,7 @@ public class RecordWriterTest @Test public void testAddRowThrowingFails() { - rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new SSTableWriter(tw, folder.getRoot().toPath())); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder.getRoot().toPath())); tw.setAddRowThrows(true); exception.expectMessage("java.lang.RuntimeException: Failed to write because addRow throws"); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); @@ -125,7 +160,7 @@ public class RecordWriterTest { // Mock context returns a 60-minute allowable time skew, so we use something just outside the limits long sixtyOneMinutesInMillis = TimeUnit.MINUTES.toMillis(61); - rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new SSTableWriter(tw, folder.getRoot().toPath())); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder.getRoot().toPath())); writerContext.setTimeProvider(() -> System.currentTimeMillis() - sixtyOneMinutesInMillis); exception.expectMessage("Time skew between Spark and Cassandra is too large. Allowable skew is 60 minutes. Spark executor time is "); Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); @@ -138,16 +173,52 @@ public class RecordWriterTest // Mock context returns a 60-minute allowable time skew, so we use something just inside the limits long fiftyNineMinutesInMillis = TimeUnit.MINUTES.toMillis(59); long remoteTime = System.currentTimeMillis() - fiftyNineMinutesInMillis; - rw = new RecordWriter(writerContext, () -> tc, SSTableWriter::new); + rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, SSTableWriter::new); writerContext.setTimeProvider(() -> remoteTime); // Return a very low "current time" to make sure we fail if skew is too bad Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true); rw.write(data); } + private void validateSuccessfulWrite(MockBulkWriterContext writerContext, + Iterator<Tuple2<DecoratedKey, Object[]>> data, + String[] columnNames) + { + RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> tc, SSTableWriter::new); + rw.write(data); + Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); + assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas + assertThat(uploads.values().stream().mapToInt(List::size).sum(), is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES)); + List<UploadRequest> requests = uploads.values().stream().flatMap(List::stream).collect(Collectors.toList()); + for (UploadRequest ur: requests) + { + assertNotNull(ur.fileHash); + } + } + private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int numValues, boolean onlyInRange) + { + return generateData(numValues, onlyInRange, false, false); + } + private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int numValues, boolean onlyInRange, boolean withTTL, boolean withTimestamp) { Stream<Tuple2<DecoratedKey, Object[]>> source = IntStream.iterate(0, integer -> integer + 1).mapToObj(index -> { - Object[] columns = {index, index, "foo" + index, index}; + Object[] columns; + if (withTTL && withTimestamp) + { + columns = new Object[]{index, index, "foo" + index, index, index * 100, System.currentTimeMillis() * 1000}; + } + else if (withTimestamp) + { + columns = new Object[]{index, index, "foo" + index, index, System.currentTimeMillis() * 1000}; + } + else if (withTTL) + { + columns = new Object[]{index, index, "foo" + index, index, index * 100}; + } + else + { + columns = new Object[]{index, index, "foo" + index, index}; + } return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns); }); if (onlyInRange) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java index 8bee484..3b3511c 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java @@ -27,6 +27,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.stream.Collectors; +import com.google.common.collect.ImmutableMap; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -90,7 +91,7 @@ public class SSTableWriterTest { MockBulkWriterContext writerContext = new MockBulkWriterContext(ring, version, ConsistencyLevel.CL.LOCAL_QUORUM); SSTableWriter tw = new SSTableWriter(writerContext, tmpDir.getRoot().toPath()); - tw.addRow(BigInteger.ONE, new Object[]{1, 1, "foo", 1}); + tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1, "course", "foo", "marks", 1)); tw.close(writerContext, 1); try (DirectoryStream<Path> dataFileStream = Files.newDirectoryStream(tw.getOutDir(), "*Data.db")) { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java index 0c6d5db..eb69631 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java @@ -24,6 +24,7 @@ import java.math.BigInteger; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -67,6 +68,7 @@ public class StreamSessionConsistencyTest ImmutableMap.of("DC1", 3, "DC2", 3), "test", 6); + private static final Map<String, Object> COLUMN_BIND_VALUES = ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2); @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -119,8 +121,7 @@ public class StreamSessionConsistencyTest } }); SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(102L), row); + tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES); tr.close(writerContext, 1); streamSession.scheduleStream(tr); if (shouldFail) @@ -157,8 +158,7 @@ public class StreamSessionConsistencyTest boolean shouldFail = calculateFailure(dc1Failures.get(), dc2Failures.get()); writerContext.setUploadSupplier(instance -> dcFailures.get(instance.getDataCenter()).getAndDecrement() <= 0); SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(102L), row); + tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES); tr.close(writerContext, 1); streamSession.scheduleStream(tr); if (shouldFail) diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java index bf79c31..e5e5ab1 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.math.BigInteger; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import com.google.common.collect.BoundType; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Range; import org.junit.Before; @@ -52,6 +54,7 @@ import static org.junit.Assert.assertTrue; public class StreamSessionTest { public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to load 1 ranges with LOCAL_QUORUM"; + private static final Map<String, Object> COLUMN_BOUND_VALUES = ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2); @Rule public TemporaryFolder folder = new TemporaryFolder(); private static final int FILES_PER_SSTABLE = 8; @@ -90,8 +93,7 @@ public class StreamSessionTest ) throws IOException, ExecutionException, InterruptedException { SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(102L), row); + tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); ss.close(); // Force "execution" of futures @@ -121,8 +123,7 @@ public class StreamSessionTest public void testMismatchedTokenRangeFails() throws IOException { SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(9999L), row); + tr.addRow(BigInteger.valueOf(9999L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, () -> ss.scheduleStream(tr)); @@ -170,8 +171,7 @@ public class StreamSessionTest { assertThrows(RuntimeException.class, () -> { SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, tableWriter.getOutDir()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(102L), row); + tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); tableWriter.removeOutDir(); ss.scheduleStream(tr); @@ -190,8 +190,7 @@ public class StreamSessionTest writerContext.setInstancesAreAvailable(false); ss = new StreamSession(writerContext, "sessionId", range, executor); SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(102L), row); + tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); assertThrows(LOAD_RANGE_ERROR_PREFIX, RuntimeException.class, () -> ss.close()); @@ -234,8 +233,7 @@ public class StreamSessionTest } }); SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(102L), row); + tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); ss.close(); // Force "execution" of futures @@ -267,8 +265,7 @@ public class StreamSessionTest } }); SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(102L), row); + tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); RuntimeException exception = assertThrows(RuntimeException.class, () -> ss.close()); // Force "execution" of futures @@ -289,8 +286,7 @@ public class StreamSessionTest { writerContext.setUploadSupplier(instance -> false); SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath()); - Object[] row = {0, 1, "course", 2}; - tr.addRow(BigInteger.valueOf(102L), row); + tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES); tr.close(writerContext, 1); ss.scheduleStream(tr); assertThrows(LOAD_RANGE_ERROR_PREFIX, RuntimeException.class, () -> ss.close()); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java index c045492..895ca06 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java @@ -72,7 +72,44 @@ public class TableSchemaTest TableSchema schema = getValidSchemaBuilder() .build(); assertThat(schema.modificationStatement, - is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (?,?,?,?);"))); + is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks);"))); + } + + @Test + public void testInsertStatementWithConstantTTL() + { + TableSchema schema = getValidSchemaBuilder().withTTLSetting(TTLOption.from("1000")).build(); + assertThat(schema.modificationStatement, is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL 1000;"))); + } + + @Test + public void testInsertStatementWithTTLColumn() + { + TableSchema schema = getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).build(); + assertThat(schema.modificationStatement, is(equalTo("INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL :ttl;"))); + } + + @Test + public void testInsertStatementWithConstantTimestamp() + { + TableSchema schema = getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("1000")).build(); + String expectedQuery = "INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TIMESTAMP 1000;"; + assertThat(schema.modificationStatement, is(equalTo(expectedQuery))); + } + + @Test + public void testInsertStatementWithTimestampColumn() + { + TableSchema schema = getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("timestamp")).build(); + String expectedQuery = "INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp;"; + assertThat(schema.modificationStatement, is(equalTo(expectedQuery))); + } + @Test + public void testInsertStatementWithTTLAndTimestampColumn() + { + TableSchema schema = getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).withTimeStampSetting(TimestampOption.from("timestamp")).build(); + String expectedQuery = "INSERT INTO test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp AND TTL :ttl;"; + assertThat(schema.modificationStatement, is(equalTo(expectedQuery))); } @Test diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java index f393f35..18ce2a6 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java @@ -32,6 +32,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.cassandra.spark.common.schema.ColumnType; import org.apache.cassandra.spark.data.CqlField; +import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; @@ -151,6 +152,8 @@ public final class TableSchemaTestCommon private ColumnType[] partitionKeyColumnTypes; private StructType dataFrameSchema; private WriteMode writeMode = null; + private TTLOption ttlOption = TTLOption.forever(); + private TimestampOption timestampOption = TimestampOption.now(); public MockTableSchemaBuilder withCqlColumns(@NotNull Map<String, CqlField.CqlType> cqlColumns) { @@ -207,6 +210,18 @@ public final class TableSchemaTestCommon return this; } + public MockTableSchemaBuilder withTTLSetting(TTLOption ttlOption) + { + this.ttlOption = ttlOption; + return this; + } + + public MockTableSchemaBuilder withTimeStampSetting(TimestampOption timestampOption) + { + this.timestampOption = timestampOption; + return this; + } + public TableSchema build() { Objects.requireNonNull(cqlColumns, @@ -228,7 +243,15 @@ public final class TableSchemaTestCommon partitionKeyColumnTypes, primaryKeyColumnNames, cassandraVersion); - return new TableSchema(dataFrameSchema, tableInfoProvider, writeMode); + if (ttlOption.withTTl() && ttlOption.columnName() != null) + { + dataFrameSchema = dataFrameSchema.add("ttl", DataTypes.IntegerType); + } + if (timestampOption.withTimestamp() && timestampOption.columnName() != null) + { + dataFrameSchema = dataFrameSchema.add("timestamp", DataTypes.IntegerType); + } + return new TableSchema(dataFrameSchema, tableInfoProvider, writeMode, ttlOption, timestampOption); } } diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java index 9ae1d52..9ec752d 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java @@ -21,8 +21,9 @@ package org.apache.cassandra.bridge; import java.io.Closeable; import java.io.IOException; +import java.util.Map; public interface SSTableWriter extends Closeable { - void addRow(Object... values) throws IOException; + void addRow(Map<String, Object> values) throws IOException; } diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java index c53fc25..05fb8bd 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java @@ -20,6 +20,7 @@ package org.apache.cassandra.bridge; import java.io.IOException; +import java.util.Map; import org.apache.cassandra.config.Config; import org.apache.cassandra.dht.IPartitioner; @@ -62,7 +63,7 @@ public class SSTableWriterImplementation implements SSTableWriter } @Override - public void addRow(Object... values) throws IOException + public void addRow(Map<String, Object> values) throws IOException { try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org