This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9523a38  CASSANDRA-18605 Adding support for TTL & Timestamps for bulk 
writes
9523a38 is described below

commit 9523a38b3f1b5bc4313e2949896ddc1fff58afbe
Author: jkonisa <jkon...@apple.com>
AuthorDate: Thu Jun 15 13:31:01 2023 -0700

    CASSANDRA-18605 Adding support for TTL & Timestamps for bulk writes
    
    This commit introduces a new feature in Spark Bulk Writer to support writes 
with
    constant/per_row based TTL & Timestamps.
    
    Patch by Jyothsna Konisa; Reviewed by Dinesh Joshi, Francisco Guerrero, 
Yifan Cai for CASSANDRA-18605
---
 CHANGES.txt                                        |   3 +-
 .../spark/example/SampleCassandraJob.java          |  54 +++++++--
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  |  14 +++
 .../bulkwriter/CassandraBulkSourceRelation.java    |  26 ++---
 .../bulkwriter/CassandraBulkWriterContext.java     |   2 +-
 .../cassandra/spark/bulkwriter/RecordWriter.java   |  27 ++++-
 .../cassandra/spark/bulkwriter/SSTableWriter.java  |   5 +-
 .../spark/bulkwriter/SqlToCqlTypeConverter.java    |  10 ++
 .../cassandra/spark/bulkwriter/TTLOption.java      | 127 +++++++++++++++++++++
 .../cassandra/spark/bulkwriter/TableSchema.java    |  83 +++++++++++---
 .../spark/bulkwriter/TimestampOption.java          | 125 ++++++++++++++++++++
 .../cassandra/spark/bulkwriter/WriterOptions.java  |   4 +-
 .../spark/bulkwriter/MockBulkWriterContext.java    |   2 +
 .../spark/bulkwriter/MockTableWriter.java          |   5 +-
 .../spark/bulkwriter/RecordWriterTest.java         | 103 ++++++++++++++---
 .../spark/bulkwriter/SSTableWriterTest.java        |   3 +-
 .../bulkwriter/StreamSessionConsistencyTest.java   |   8 +-
 .../spark/bulkwriter/StreamSessionTest.java        |  24 ++--
 .../spark/bulkwriter/TableSchemaTest.java          |  39 ++++++-
 .../spark/bulkwriter/TableSchemaTestCommon.java    |  25 +++-
 .../org/apache/cassandra/bridge/SSTableWriter.java |   3 +-
 .../bridge/SSTableWriterImplementation.java        |   3 +-
 22 files changed, 600 insertions(+), 95 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3632517..eeaee1c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Add support for TTL & Timestamps for bulk writes (CASSANDRA-18605)
  * Add circleci configuration yaml for Cassandra Analytics (CASSANDRA-18578)
  * Provide a SecretsProvider interface to abstract the secret provisioning 
(CASSANDRA-18545)
- * Add the .asf.yaml file (CASSANDRA-18548)
\ No newline at end of file
+ * Add the .asf.yaml file (CASSANDRA-18548)
diff --git 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
index 4e21749..58c9827 100644
--- 
a/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
+++ 
b/cassandra-analytics-core-example/src/main/java/org/apache/cassandra/spark/example/SampleCassandraJob.java
@@ -28,6 +28,9 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 
+import org.apache.cassandra.spark.bulkwriter.TTLOption;
+import org.apache.cassandra.spark.bulkwriter.TimestampOption;
+import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +49,7 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.StructType;
 
 import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.IntegerType;
 import static org.apache.spark.sql.types.DataTypes.LongType;
 
 /**
@@ -122,15 +126,12 @@ public final class SampleCassandraJob
 
     private static Dataset<Row> write(long rowCount, SparkConf sparkConf, 
SQLContext sql, SparkContext sc)
     {
-        StructType schema = new StructType()
-                            .add("id", LongType, false)
-                            .add("course", BinaryType, false)
-                            .add("marks", LongType, false);
-
         JavaSparkContext javaSparkContext = 
JavaSparkContext.fromSparkContext(sc);
         int parallelism = sc.defaultParallelism();
-        JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount, 
parallelism);
-        Dataset<Row> df = sql.createDataFrame(rows, schema);
+        boolean addTTLColumn = true;
+        boolean addTimestampColumn = true;
+        JavaRDD<Row> rows = genDataset(javaSparkContext, rowCount, 
parallelism, addTTLColumn, addTimestampColumn);
+        Dataset<Row> df = sql.createDataFrame(rows, 
getWriteSchema(addTTLColumn, addTimestampColumn));
 
         df.write()
           .format("org.apache.cassandra.spark.sparksql.CassandraDataSink")
@@ -140,6 +141,11 @@ public final class SampleCassandraJob
           .option("local_dc", "datacenter1")
           .option("bulk_writer_cl", "LOCAL_QUORUM")
           .option("number_splits", "-1")
+          // A constant timestamp and TTL can be used by setting the following 
options.
+          // .option(WriterOptions.TIMESTAMP.name(), 
TimestampOption.constant(System.currentTimeMillis() * 1000))
+          // .option(WriterOptions.TTL.name(), TTLOption.constant(20))
+          .option(WriterOptions.TTL.name(), TTLOption.perRow("ttl"))
+          .option(WriterOptions.TIMESTAMP.name(), 
TimestampOption.perRow("timestamp"))
           .mode("append")
           .save();
         return df;
@@ -174,6 +180,23 @@ public final class SampleCassandraJob
         return df;
     }
 
+    private static StructType getWriteSchema(boolean addTTLColumn, boolean 
addTimestampColumn)
+    {
+        StructType schema = new StructType()
+                .add("id", LongType, false)
+                .add("course", BinaryType, false)
+                .add("marks", LongType, false);
+        if (addTTLColumn)
+        {
+            schema = schema.add("ttl", IntegerType, false);
+        }
+        if (addTimestampColumn)
+        {
+            schema = schema.add("timestamp", LongType, false);
+        }
+        return schema;
+    }
+
     private static void checkSmallDataFrameEquality(Dataset<Row> expected, 
Dataset<Row> actual)
     {
         if (actual == null)
@@ -186,11 +209,14 @@ public final class SampleCassandraJob
         }
     }
 
-    private static JavaRDD<Row> genDataset(JavaSparkContext sc, long records, 
Integer parallelism)
+    private static JavaRDD<Row> genDataset(JavaSparkContext sc, long records, 
Integer parallelism,
+                                           boolean addTTLColumn, boolean 
addTimestampColumn)
     {
         long recordsPerPartition = records / parallelism;
         long remainder = records - (recordsPerPartition * parallelism);
         List<Integer> seq = IntStream.range(0, 
parallelism).boxed().collect(Collectors.toList());
+        int ttl = 10;
+        long timeStamp = System.currentTimeMillis() * 1000;
         JavaRDD<Row> dataset = sc.parallelize(seq, 
parallelism).mapPartitionsWithIndex(
                 (Function2<Integer, Iterator<Integer>, Iterator<Row>>) (index, 
integerIterator) -> {
                     long firstRecordNumber = index * recordsPerPartition;
@@ -201,6 +227,18 @@ public final class SampleCassandraJob
                         Integer courseNameStringLen = 
courseNameString.length();
                         Integer courseNameMultiplier = 1000 / 
courseNameStringLen;
                         byte[] courseName = dupStringAsBytes(courseNameString, 
courseNameMultiplier);
+                        if (addTTLColumn && addTimestampColumn)
+                        {
+                            return RowFactory.create(recordNumber, courseName, 
recordNumber, ttl, timeStamp);
+                        }
+                        if (addTTLColumn)
+                        {
+                            return RowFactory.create(recordNumber, courseName, 
recordNumber, ttl);
+                        }
+                        if (addTimestampColumn)
+                        {
+                            return RowFactory.create(recordNumber, courseName, 
recordNumber, timeStamp);
+                        }
                         return RowFactory.create(recordNumber, courseName, 
recordNumber);
                     }).iterator();
                     return rows;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 292282f..ee63df9 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -119,6 +119,8 @@ public class BulkSparkConf implements Serializable
     protected final String truststorePath;
     protected final String truststoreBase64Encoded;
     protected final String truststoreType;
+    protected final String ttl;
+    protected final String timestamp;
     protected final SparkConf conf;
     public final boolean validateSSTables;
     public final int commitThreadsPerInstance;
@@ -157,6 +159,8 @@ public class BulkSparkConf implements Serializable
         // else fall back to props, and then default if neither specified
         this.useOpenSsl = getBoolean(USE_OPENSSL, true);
         this.ringRetryCount = getInt(RING_RETRY_COUNT, 
DEFAULT_RING_RETRY_COUNT);
+        this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), 
null);
+        this.timestamp = MapUtils.getOrDefault(options, 
WriterOptions.TIMESTAMP.name(), null);
         validateEnvironment();
     }
 
@@ -241,6 +245,16 @@ public class BulkSparkConf implements Serializable
         return truststorePath;
     }
 
+    protected TTLOption getTTLOptions()
+    {
+        return TTLOption.from(ttl);
+    }
+
+    protected TimestampOption getTimestampOptions()
+    {
+        return TimestampOption.from(timestamp);
+    }
+
     protected String getTruststoreBase64Encoded()
     {
         return truststoreBase64Encoded;
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
index 15d4371..8f2c0b5 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java
@@ -19,8 +19,9 @@
 
 package org.apache.cassandra.spark.bulkwriter;
 
-import java.io.Serializable;
+import java.util.Iterator;
 
+import org.apache.spark.api.java.function.VoidFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +103,7 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
                 .map(tableSchema::normalize)
                 .keyBy(tokenizer::getDecoratedKey)
                 
.repartitionAndSortWithinPartitions(broadcastContext.getValue().job().getTokenPartitioner());
-        persist(sortedRDD);
+        persist(sortedRDD, data.columns());
     }
 
     private void cancelJob(@NotNull CancelJobEvent cancelJobEvent)
@@ -121,7 +122,7 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
     }
 
     @SuppressWarnings("RedundantCast")
-    private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> 
sortedRDD)
+    private void persist(@NotNull JavaPairRDD<DecoratedKey, Object[]> 
sortedRDD, String[] columnNames)
     {
         writeValidator.setPhase("Environment Validation");
         writeValidator.validateInitialEnvironment();
@@ -129,7 +130,7 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
 
         try
         {
-            sortedRDD.foreachPartition(new 
WriteIterator(broadcastContext)::call);
+            sortedRDD.foreachPartition(writeRowsInPartition(broadcastContext, 
columnNames));
             writeValidator.failIfRingChanged();
         }
         catch (Throwable throwable)
@@ -168,18 +169,11 @@ public class CassandraBulkSourceRelation extends 
BaseRelation implements Inserta
         }
     }
 
-    private static class WriteIterator implements Serializable
+    // Made this function static to avoid capturing reference to 
CassandraBulkSourceRelation object which cannot be
+    // serialized.
+    private static VoidFunction<Iterator<Tuple2<DecoratedKey, Object[]>>> 
writeRowsInPartition(Broadcast<BulkWriterContext> broadcastContext,
+                                                                               
                String[] columnNames)
     {
-        private final Broadcast<BulkWriterContext> broadcastContext;
-
-        WriteIterator(Broadcast<BulkWriterContext> broadcastContext)
-        {
-            this.broadcastContext = broadcastContext;
-        }
-
-        public void call(java.util.Iterator<Tuple2<DecoratedKey, Object[]>> 
iterator)
-        {
-            new RecordWriter(broadcastContext.getValue()).write(iterator);
-        }
+        return itr -> new RecordWriter(broadcastContext.getValue(), 
columnNames).write(itr);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
index 4813b1c..2709b51 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkWriterContext.java
@@ -91,7 +91,7 @@ public class CassandraBulkWriterContext implements 
BulkWriterContext, KryoSerial
         CqlTable cqlTable = bridge.buildSchema(tableSchema, keyspace, 
replicationFactor, partitioner, udts, null, indexCount);
 
         TableInfoProvider tableInfoProvider = new 
CqlTableInfoProvider(tableSchema, cqlTable);
-        schemaInfo = new CassandraSchemaInfo(new TableSchema(dfSchema, 
tableInfoProvider, conf.writeMode));
+        schemaInfo = new CassandraSchemaInfo(new TableSchema(dfSchema, 
tableInfoProvider, conf.writeMode, conf.getTTLOptions(), 
conf.getTimestampOptions()));
     }
 
     public static BulkWriterContext fromOptions(@NotNull SparkContext 
sparkContext,
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index a5aaf8d..32b8632 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -27,8 +27,10 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.function.Supplier;
@@ -52,23 +54,26 @@ public class RecordWriter implements Serializable
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RecordWriter.class);
 
     private final BulkWriterContext writerContext;
+    private final String[] columnNames;
     private Supplier<TaskContext> taskContextSupplier;
     private final BiFunction<BulkWriterContext, Path, SSTableWriter> 
tableWriterSupplier;
     private SSTableWriter sstableWriter = null;
     private int batchNumber = 0;
     private int batchSize = 0;
 
-    public RecordWriter(BulkWriterContext writerContext)
+    public RecordWriter(BulkWriterContext writerContext, String[] columnNames)
     {
-        this(writerContext, TaskContext::get, SSTableWriter::new);
+        this(writerContext, columnNames, TaskContext::get, SSTableWriter::new);
     }
 
     @VisibleForTesting
     RecordWriter(BulkWriterContext writerContext,
+                 String[] columnNames,
                  Supplier<TaskContext> taskContextSupplier,
                  BiFunction<BulkWriterContext, Path, SSTableWriter> 
tableWriterSupplier)
     {
         this.writerContext = writerContext;
+        this.columnNames = columnNames;
         this.taskContextSupplier = taskContextSupplier;
         this.tableWriterSupplier = tableWriterSupplier;
     }
@@ -99,12 +104,13 @@ public class RecordWriter implements Serializable
                                  
Integer.toString(taskContext.stageAttemptNumber()),
                                  Integer.toString(taskContext.attemptNumber()),
                                  Integer.toString(partitionId));
+        Map<String, Object> valueMap = new HashMap<>();
         try
         {
             while (dataIterator.hasNext())
             {
                 maybeCreateTableWriter(partitionId, baseDir);
-                writeRow(dataIterator, partitionId, range);
+                writeRow(valueMap, dataIterator, partitionId, range);
                 checkBatchSize(streamSession, partitionId, job);
             }
 
@@ -138,7 +144,8 @@ public class RecordWriter implements Serializable
         }
     }
 
-    public void writeRow(scala.collection.Iterator<Tuple2<DecoratedKey, 
Object[]>> dataIterator,
+    public void writeRow(Map<String, Object> valueMap,
+                         scala.collection.Iterator<Tuple2<DecoratedKey, 
Object[]>> dataIterator,
                          int partitionId,
                          Range<BigInteger> range) throws IOException
     {
@@ -149,7 +156,7 @@ public class RecordWriter implements Serializable
                                  String.format("Received Token %s outside of 
expected range %s", token, range));
         try
         {
-            sstableWriter.addRow(token, tuple._2());
+            sstableWriter.addRow(token, getBindValuesForColumns(valueMap, 
columnNames, tuple._2()));
         }
         catch (RuntimeException exception)
         {
@@ -186,6 +193,16 @@ public class RecordWriter implements Serializable
         }
     }
 
+    private static Map<String, Object> getBindValuesForColumns(Map<String, 
Object> map, String[] columnNames, Object[] values)
+    {
+        assert values.length == columnNames.length : "Number of values does 
not match the number of columns " + values.length + ", " + columnNames.length;
+        for (int i = 0; i < columnNames.length; i++)
+        {
+            map.put(columnNames[i], values[i]);
+        }
+        return map;
+    }
+
     private void finalizeSSTable(StreamSession streamSession,
                                  int partitionId,
                                  SSTableWriter sstableWriter,
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
index bf982ef..df6f155 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
@@ -91,15 +91,14 @@ public class SSTableWriter
         return CASSANDRA_VERSION_PREFIX + lowestCassandraVersion;
     }
 
-    public void addRow(BigInteger token, Object[] values) throws IOException
+    public void addRow(BigInteger token, Map<String, Object> boundValues) 
throws IOException
     {
         if (minToken == null)
         {
             minToken = token;
         }
         maxToken = token;
-
-        cqlSSTableWriter.addRow(values);
+        cqlSSTableWriter.addRow(boundValues);
     }
 
     public void close(BulkWriterContext writerContext, int partitionId) throws 
IOException
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
index 03772bd..5749a07 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
@@ -171,6 +171,16 @@ public final class SqlToCqlTypeConverter implements 
Serializable
         }
     }
 
+    public static Converter<?> getIntegerConverter()
+    {
+        return INTEGER_CONVERTER;
+    }
+
+    public static Converter<?> getLongConverter()
+    {
+        return LONG_CONVERTER;
+    }
+
     private static Converter<?> determineCustomConvert(CqlField.CqlCustom 
customType)
     {
         
Preconditions.checkArgument(customType.name().equalsIgnoreCase(CUSTOM), 
"Non-custom types are not supported");
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
new file mode 100644
index 0000000..21a07be
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TTLOption.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+
+import java.io.Serializable;
+import java.time.Duration;
+
+public final class TTLOption implements Serializable
+{
+    private static final TTLOption FOREVER = new TTLOption(0);
+    private final String ttlColumnName;
+    private final Integer ttlInSeconds;
+
+    private TTLOption(String ttlColumnName)
+    {
+        this.ttlColumnName = ttlColumnName;
+        this.ttlInSeconds = null;
+    }
+
+    private TTLOption(int ttlInSeconds)
+    {
+        this.ttlInSeconds = ttlInSeconds;
+        this.ttlColumnName = null;
+    }
+
+    public static TTLOption from(String ttl)
+    {
+        if (ttl == null)
+        {
+            return FOREVER;
+        }
+        try
+        {
+            return new TTLOption(Integer.parseInt(ttl));
+        }
+        catch (Exception e)
+        {
+
+            return new TTLOption(ttl);
+        }
+    }
+
+    /**
+     * TTL option for write with a constant TTL. When same values for TTL 
should be used for all rows in a bulk write
+     * call use this option.
+     *
+     * @param ttlInSeconds ttl value in seconds
+     * @return TTLOption
+     */
+    public static String constant(int ttlInSeconds)
+    {
+        return String.valueOf(ttlInSeconds);
+    }
+
+    /**
+     * TTL option for write with a constant TTL. When same values for TTL 
should be used for all rows in a bulk write
+     * call use this option.
+     *
+     * @param duration ttl value in Duration
+     * @return TTLOption
+     */
+    public static String constant(Duration duration)
+    {
+        return String.valueOf(duration.getSeconds());
+    }
+
+    /**
+     * TTL option for writes with TTL per Row. When different TTL has to be 
used for different rows in a bulk write
+     * call use this option. It expects the input RDD to supply the TTL values 
as an additional column at each row of
+     * the RDD. The TTL value provider column is selected by {@code  
ttlColumnName}
+     *
+     * @param ttlColumnName column name which has TTL values for each row
+     * @return TTLOption
+     */
+    public static String perRow(String ttlColumnName)
+    {
+        return ttlColumnName;
+    }
+
+    public static TTLOption forever()
+    {
+        return FOREVER;
+    }
+
+    public String columnName()
+    {
+        return ttlColumnName;
+    }
+
+    public boolean withTTl()
+    {
+        return !this.equals(FOREVER)
+                && (ttlColumnName != null || ttlInSeconds != null);
+    }
+
+    @Override
+    public String toString()
+    {
+        if (ttlColumnName != null && !ttlColumnName.isEmpty())
+        {
+            return ":" + ttlColumnName;
+        }
+        if (ttlInSeconds != null)
+        {
+            return Integer.toString(ttlInSeconds);
+        }
+        return null;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
index 6e5b0ae..fa3371f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TableSchema.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.base.Preconditions;
+import org.apache.cassandra.spark.data.CqlField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,10 +47,15 @@ public class TableSchema implements Serializable
     final List<SqlToCqlTypeConverter.Converter<?>> converters;
     private final List<Integer> keyFieldPositions;
     private final WriteMode writeMode;
+    private final TTLOption ttlOption;
+    private final TimestampOption timestampOption;
 
-    public TableSchema(StructType dfSchema, TableInfoProvider tableInfo, 
WriteMode writeMode)
+    public TableSchema(StructType dfSchema, TableInfoProvider tableInfo, 
WriteMode writeMode,
+                       TTLOption ttlOption, TimestampOption timestampOption)
     {
         this.writeMode = writeMode;
+        this.ttlOption = ttlOption;
+        this.timestampOption = timestampOption;
 
         validateDataFrameCompatibility(dfSchema, tableInfo);
         validateNoSecondaryIndexes(tableInfo);
@@ -58,7 +64,7 @@ public class TableSchema implements Serializable
         this.modificationStatement = getModificationStatement(dfSchema, 
tableInfo);
         this.partitionKeyColumns = getPartitionKeyColumnNames(tableInfo);
         this.partitionKeyColumnTypes = getPartitionKeyColumnTypes(tableInfo);
-        this.converters = getConverters(dfSchema, tableInfo);
+        this.converters = getConverters(dfSchema, tableInfo, ttlOption, 
timestampOption);
         LOGGER.info("Converters: {}", converters);
         this.keyFieldPositions = getKeyFieldPositions(dfSchema, 
tableInfo.getColumnNames(), getRequiredKeyColumns(tableInfo));
     }
@@ -100,12 +106,24 @@ public class TableSchema implements Serializable
     }
 
     private static List<SqlToCqlTypeConverter.Converter<?>> 
getConverters(StructType dfSchema,
-                                                                          
TableInfoProvider tableInfo)
+                                                                          
TableInfoProvider tableInfo,
+                                                                          
TTLOption ttlOption,
+                                                                          
TimestampOption timestampOption)
     {
         return Arrays.stream(dfSchema.fieldNames())
-                     .map(tableInfo::getColumnType)
-                     .map(SqlToCqlTypeConverter::getConverter)
-                     .collect(Collectors.toList());
+                     .map(fieldName -> {
+                         if (fieldName.equals(ttlOption.columnName()))
+                         {
+                             return 
SqlToCqlTypeConverter.getIntegerConverter();
+                         }
+                         if (fieldName.equals(timestampOption.columnName()))
+                         {
+                             return SqlToCqlTypeConverter.getLongConverter();
+                         }
+                         CqlField.CqlType cqlType = 
tableInfo.getColumnType(fieldName);
+                         return SqlToCqlTypeConverter.getConverter(cqlType);
+                     })
+                    .collect(Collectors.toList());
     }
 
     private static List<ColumnType<?>> 
getPartitionKeyColumnTypes(TableInfoProvider tableInfo)
@@ -130,7 +148,7 @@ public class TableSchema implements Serializable
         switch (writeMode)
         {
             case INSERT:
-                return getInsertStatement(dfSchema, tableInfo);
+                return getInsertStatement(dfSchema, tableInfo, ttlOption, 
timestampOption);
             case DELETE_PARTITION:
                 return getDeleteStatement(dfSchema, tableInfo);
             default:
@@ -138,15 +156,38 @@ public class TableSchema implements Serializable
         }
     }
 
-    private static String getInsertStatement(StructType dfSchema, 
TableInfoProvider tableInfo)
+    private static String getInsertStatement(StructType dfSchema, 
TableInfoProvider tableInfo,
+                                             TTLOption ttlOption, 
TimestampOption timestampOption)
     {
-        String insertStatement = String.format("INSERT INTO %s.%s (%s) VALUES 
(%s);",
-                                               tableInfo.getKeyspaceName(),
-                                               tableInfo.getName(),
-                                               String.join(",", 
dfSchema.fieldNames()),
-                                               
Arrays.stream(dfSchema.fieldNames())
-                                                     .map(field -> "?")
-                                                     
.collect(Collectors.joining(",")));
+        List<String> columnNames = Arrays.stream(dfSchema.fieldNames())
+                                         .filter(fieldName -> 
!fieldName.equals(ttlOption.columnName()))
+                                         .filter(fieldName -> 
!fieldName.equals(timestampOption.columnName()))
+                                         .collect(Collectors.toList());
+        StringBuilder stringBuilder = new StringBuilder("INSERT INTO ")
+                .append(tableInfo.getKeyspaceName())
+                .append(".").append(tableInfo.getName())
+                .append(columnNames.stream().collect(Collectors.joining(",", " 
(", ") ")))
+                .append("VALUES")
+                .append(columnNames.stream().map(columnName -> ":" + 
columnName).collect(Collectors.joining(",", " (", ")")));
+        if (ttlOption.withTTl() && timestampOption.withTimestamp())
+        {
+            stringBuilder.append(" USING TIMESTAMP ")
+                         .append(timestampOption)
+                         .append(" AND TTL ")
+                         .append(ttlOption);
+        }
+        else if (timestampOption.withTimestamp())
+        {
+            stringBuilder.append(" USING TIMESTAMP ")
+                         .append(timestampOption);
+        }
+        else if (ttlOption.withTTl())
+        {
+            stringBuilder.append(" USING TTL ")
+                    .append(ttlOption);
+        }
+        stringBuilder.append(";");
+        String insertStatement = stringBuilder.toString();
 
         LOGGER.info("CQL insert statement for the RDD {}", insertStatement);
         return insertStatement;
@@ -174,7 +215,7 @@ public class TableSchema implements Serializable
         switch (writeMode)
         {
             case INSERT:
-                validateDataframeFieldsInTable(tableInfo, dfFields);
+                validateDataframeFieldsInTable(tableInfo, dfFields, ttlOption, 
timestampOption);
                 return;
             case DELETE_PARTITION:
                 validateOnlyPartitionKeyColumnsInDataframe(tableInfo, 
dfFields);
@@ -204,12 +245,16 @@ public class TableSchema implements Serializable
                                             .collect(Collectors.joining(",")));
     }
 
-    private static void validateDataframeFieldsInTable(TableInfoProvider 
tableInfo, Set<String> dfFields)
+    private static void validateDataframeFieldsInTable(TableInfoProvider 
tableInfo, Set<String> dfFields,
+                                                       TTLOption ttlOption, 
TimestampOption timestampOption)
     {
         // Make sure all fields in DF schema are part of table
-        String unknownFields = dfFields.stream()
+        List<String> unknownFields = dfFields.stream()
                                        .filter(columnName -> 
!tableInfo.columnExists(columnName))
-                                       .collect(Collectors.joining(","));
+                                       .filter(columnName -> 
!columnName.equals(ttlOption.columnName()))
+                                       .filter(columnName -> 
!columnName.equals(timestampOption.columnName()))
+                                       .collect(Collectors.toList());
+
         Preconditions.checkArgument(unknownFields.isEmpty(), "Unknown fields 
in data frame => " + unknownFields);
     }
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
new file mode 100644
index 0000000..06307b9
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/TimestampOption.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+
+public final class TimestampOption implements Serializable
+{
+    private static final TimestampOption NOW = new 
TimestampOption(System.currentTimeMillis() * 1000);
+    private String timestampColumnName;
+    private Long timeStampInMicroSeconds;
+
+    private TimestampOption(String timestampColumnName)
+    {
+        this.timestampColumnName = timestampColumnName;
+    }
+
+    private TimestampOption(Long timeStampInMicroSeconds)
+    {
+        this.timeStampInMicroSeconds = timeStampInMicroSeconds;
+    }
+
+    public static TimestampOption from(String timestamp)
+    {
+        if (timestamp == null)
+        {
+            return NOW;
+        }
+        try
+        {
+            return new TimestampOption(Long.parseLong(timestamp));
+        }
+        catch (Exception e)
+        {
+
+            return new TimestampOption(timestamp);
+        }
+    }
+
+    /**
+     * Timestamp option for write with a constant timestamp. When same values 
for timestamp should be used for all rows in
+     * a bulk write call use this option.
+     *
+     * @param timestampInMicroSeconds timestamp value in microseconds
+     * @return timestamp option
+     */
+    public static String constant(long timestampInMicroSeconds)
+    {
+        return String.valueOf(timestampInMicroSeconds);
+    }
+
+    /**
+     * Timestamp option for write with a constant timestamp. When same values 
for timestamp should be used for all rows in
+     * a bulk write call use this option.
+     *
+     * @param duration timestamp value in Duration
+     * @return timestamp option
+     */
+    public static String constant(Duration duration)
+    {
+        return String.valueOf(duration.get(ChronoUnit.MICROS));
+    }
+
+    /**
+     * Timestamp option for writes with timestamp per Row. When different 
timestamp has to be used for different rows in
+     * a bulk write call use this option. It expects the input RDD to supply 
the timestamp values as an additional
+     * column at each row of the RDD. The timestamp value provider column is 
selected by {@code  timeStampColumnName}
+     *
+     * @param timeStampColumnName column name which has timestamp values for 
each row
+     * @return timestamp option
+     */
+    public static String perRow(String timeStampColumnName)
+    {
+        return timeStampColumnName;
+    }
+
+    public static TimestampOption now()
+    {
+        return NOW;
+    }
+
+    public String columnName()
+    {
+        return timestampColumnName;
+    }
+
+    public boolean withTimestamp()
+    {
+        return !this.equals(NOW)
+                && (timestampColumnName != null || timeStampInMicroSeconds != 
null);
+    }
+
+    @Override
+    public String toString()
+    {
+        if (timestampColumnName != null && !timestampColumnName.isEmpty())
+        {
+            return ":" + timestampColumnName;
+        }
+        if (timeStampInMicroSeconds != null)
+        {
+            return Long.toString(timeStampInMicroSeconds);
+        }
+        return null;
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index 7b48df8..d824f05 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -43,5 +43,7 @@ public enum WriterOptions implements WriterOption
     TRUSTSTORE_BASE64_ENCODED,
     SIDECAR_PORT,
     ROW_BUFFER_MODE,
-    SSTABLE_DATA_SIZE_IN_MB
+    SSTABLE_DATA_SIZE_IN_MB,
+    TTL,
+    TIMESTAMP
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 262b7df..4cf29ca 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -85,6 +85,7 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
     private CommitResultSupplier crSupplier = (uuids, dc) -> new 
RemoteCommitResult(true, Collections.emptyList(), uuids, null);
 
     private Predicate<CassandraInstance> uploadRequestConsumer = instance -> 
true;
+    private TTLOption ttlOption = TTLOption.forever();
 
     public MockBulkWriterContext(CassandraRing<RingInstance> ring,
                                  String cassandraVersion,
@@ -111,6 +112,7 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
                       .withPartitionKeyColumnTypes(partitionKeyColumnTypes)
                       .withWriteMode(WriteMode.INSERT)
                       .withDataFrameSchema(validDataFrameSchema)
+                      .withTTLSetting(ttlOption)
                       .build();
         this.jobId = java.util.UUID.randomUUID();
     }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java
index 4003f54..716b7ee 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockTableWriter.java
@@ -24,6 +24,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Map;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.io.FileUtils;
@@ -64,13 +65,13 @@ public class MockTableWriter implements SSTableWriter
     }
 
     @Override
-    public void addRow(Object... values)
+    public void addRow(Map<String, Object> values) throws IOException
     {
         if (addRowThrows)
         {
             throw new RuntimeException("Failed to write because addRow 
throws");
         }
-        rows.add(values);
+        rows.add(values.values().toArray());
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index 4e60ec6..1ccdc72 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -48,6 +48,7 @@ public class RecordWriterTest
     public static final int REPLICA_COUNT = 3;
     public static final int FILES_PER_SSTABLE = 8;
     public static final int UPLOADED_TABLES = 3;
+    private static final String[] COLUMN_NAMES = {"id", "date", "course", 
"marks"};
     @Rule
     public TemporaryFolder folder = new TemporaryFolder();
 
@@ -75,17 +76,51 @@ public class RecordWriterTest
     @Test
     public void testSuccessfulWrite()
     {
-        rw = new RecordWriter(writerContext, () -> tc, SSTableWriter::new);
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
-        rw.write(data);
-        Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
-        assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
-        assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
-        List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
-        for (UploadRequest ur: requests)
-        {
-            assertNotNull(ur.fileHash);
-        }
+        validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
+    }
+
+    @Test
+    public void testWriteWithConstantTTL()
+    {
+        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
+        validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
+    }
+
+    @Test
+    public void testWriteWithTTLColumn()
+    {
+        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, false);
+        String[] columnNamesWithTtl = {"id", "date", "course", "marks", "ttl"};
+        validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
+    }
+
+    @Test
+    public void testWriteWithConstantTimestamp()
+    {
+        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, false);
+        validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
+    }
+
+    @Test
+    public void testWriteWithTimestampColumn()
+    {
+        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
false, true);
+        String[] columnNamesWithTimestamp = {"id", "date", "course", "marks", 
"timestamp"};
+        validateSuccessfulWrite(bulkWriterContext, data, 
columnNamesWithTimestamp);
+    }
+
+    @Test
+    public void testWriteWithTimestampAndTTLColumn()
+    {
+        MockBulkWriterContext bulkWriterContext =  new 
MockBulkWriterContext(ring);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true, 
true, true);
+        String[] columnNames = {"id", "date", "course", "marks", "ttl", 
"timestamp"};
+        validateSuccessfulWrite(bulkWriterContext, data, columnNames);
     }
 
     @Test
@@ -93,7 +128,7 @@ public class RecordWriterTest
     {
         // TODO: Add better error handling with human-readable exception 
messages in SSTableReader::new
         exception.expect(RuntimeException.class);
-        rw = new RecordWriter(writerContext, () -> tc, (wc, path) ->  new 
SSTableWriter(tw.setOutDir(path), path));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) ->  new SSTableWriter(tw.setOutDir(path), path));
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
         rw.write(data);
         Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
@@ -104,7 +139,7 @@ public class RecordWriterTest
     @Test
     public void testWriteWithOutOfRangeTokenFails()
     {
-        rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new 
SSTableWriter(tw, folder.getRoot().toPath()));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder.getRoot().toPath()));
         exception.expectMessage("Received Token 5765203080415074583 outside of 
expected range [-9223372036854775808‥0]");
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, false);
         rw.write(data);
@@ -113,7 +148,7 @@ public class RecordWriterTest
     @Test
     public void testAddRowThrowingFails()
     {
-        rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new 
SSTableWriter(tw, folder.getRoot().toPath()));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder.getRoot().toPath()));
         tw.setAddRowThrows(true);
         exception.expectMessage("java.lang.RuntimeException: Failed to write 
because addRow throws");
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
@@ -125,7 +160,7 @@ public class RecordWriterTest
     {
         // Mock context returns a 60-minute allowable time skew, so we use 
something just outside the limits
         long sixtyOneMinutesInMillis = TimeUnit.MINUTES.toMillis(61);
-        rw = new RecordWriter(writerContext, () -> tc, (wc, path) -> new 
SSTableWriter(tw, folder.getRoot().toPath()));
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder.getRoot().toPath()));
         writerContext.setTimeProvider(() -> System.currentTimeMillis() - 
sixtyOneMinutesInMillis);
         exception.expectMessage("Time skew between Spark and Cassandra is too 
large. Allowable skew is 60 minutes. Spark executor time is ");
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
@@ -138,16 +173,52 @@ public class RecordWriterTest
         // Mock context returns a 60-minute allowable time skew, so we use 
something just inside the limits
         long fiftyNineMinutesInMillis = TimeUnit.MINUTES.toMillis(59);
         long remoteTime = System.currentTimeMillis() - 
fiftyNineMinutesInMillis;
-        rw = new RecordWriter(writerContext, () -> tc, SSTableWriter::new);
+        rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, 
SSTableWriter::new);
         writerContext.setTimeProvider(() -> remoteTime);  // Return a very low 
"current time" to make sure we fail if skew is too bad
         Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, true);
         rw.write(data);
     }
 
+    private void validateSuccessfulWrite(MockBulkWriterContext writerContext,
+                                         Iterator<Tuple2<DecoratedKey, 
Object[]>> data,
+                                         String[] columnNames)
+    {
+        RecordWriter rw = new RecordWriter(writerContext, columnNames, () -> 
tc, SSTableWriter::new);
+        rw.write(data);
+        Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
+        assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
+        assertThat(uploads.values().stream().mapToInt(List::size).sum(), 
is(REPLICA_COUNT * FILES_PER_SSTABLE * UPLOADED_TABLES));
+        List<UploadRequest> requests = 
uploads.values().stream().flatMap(List::stream).collect(Collectors.toList());
+        for (UploadRequest ur: requests)
+        {
+            assertNotNull(ur.fileHash);
+        }
+    }
+
     private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int 
numValues, boolean onlyInRange)
+    {
+        return generateData(numValues, onlyInRange, false, false);
+    }
+    private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(int 
numValues, boolean onlyInRange, boolean withTTL, boolean withTimestamp)
     {
         Stream<Tuple2<DecoratedKey, Object[]>> source = IntStream.iterate(0, 
integer -> integer + 1).mapToObj(index -> {
-            Object[] columns = {index, index, "foo" + index, index};
+            Object[] columns;
+            if (withTTL && withTimestamp)
+            {
+                columns = new Object[]{index, index, "foo" + index, index, 
index * 100, System.currentTimeMillis() * 1000};
+            }
+            else if (withTimestamp)
+            {
+                columns = new Object[]{index, index, "foo" + index, index, 
System.currentTimeMillis() * 1000};
+            }
+            else if (withTTL)
+            {
+                columns = new Object[]{index, index, "foo" + index, index, 
index * 100};
+            }
+            else
+            {
+                columns = new Object[]{index, index, "foo" + index, index};
+            }
             return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns);
         });
         if (onlyInRange)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
index 8bee484..3b3511c 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterTest.java
@@ -27,6 +27,7 @@ import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -90,7 +91,7 @@ public class SSTableWriterTest
     {
         MockBulkWriterContext writerContext = new MockBulkWriterContext(ring, 
version, ConsistencyLevel.CL.LOCAL_QUORUM);
         SSTableWriter tw = new SSTableWriter(writerContext, 
tmpDir.getRoot().toPath());
-        tw.addRow(BigInteger.ONE, new Object[]{1, 1, "foo", 1});
+        tw.addRow(BigInteger.ONE, ImmutableMap.of("id", 1, "date", 1, 
"course", "foo", "marks", 1));
         tw.close(writerContext, 1);
         try (DirectoryStream<Path> dataFileStream = 
Files.newDirectoryStream(tw.getOutDir(), "*Data.db"))
         {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
index 0c6d5db..eb69631 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
@@ -24,6 +24,7 @@ import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -67,6 +68,7 @@ public class StreamSessionConsistencyTest
                                                                                
 ImmutableMap.of("DC1", 3, "DC2", 3),
                                                                                
 "test",
                                                                                
 6);
+    private static final Map<String, Object> COLUMN_BIND_VALUES = 
ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2);
 
     @Rule
     public TemporaryFolder folder = new TemporaryFolder();
@@ -119,8 +121,7 @@ public class StreamSessionConsistencyTest
             }
         });
         SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder.getRoot().toPath());
-        Object[] row = {0, 1, "course", 2};
-        tr.addRow(BigInteger.valueOf(102L), row);
+        tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES);
         tr.close(writerContext, 1);
         streamSession.scheduleStream(tr);
         if (shouldFail)
@@ -157,8 +158,7 @@ public class StreamSessionConsistencyTest
         boolean shouldFail = calculateFailure(dc1Failures.get(), 
dc2Failures.get());
         writerContext.setUploadSupplier(instance -> 
dcFailures.get(instance.getDataCenter()).getAndDecrement() <= 0);
         SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder.getRoot().toPath());
-        Object[] row = {0, 1, "course", 2};
-        tr.addRow(BigInteger.valueOf(102L), row);
+        tr.addRow(BigInteger.valueOf(102L), COLUMN_BIND_VALUES);
         tr.close(writerContext, 1);
         streamSession.scheduleStream(tr);
         if (shouldFail)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
index bf79c31..e5e5ab1 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionTest.java
@@ -23,11 +23,13 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.BoundType;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import org.junit.Before;
@@ -52,6 +54,7 @@ import static org.junit.Assert.assertTrue;
 public class StreamSessionTest
 {
     public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to load 1 
ranges with LOCAL_QUORUM";
+    private static final Map<String, Object> COLUMN_BOUND_VALUES = 
ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2);
     @Rule
     public TemporaryFolder folder = new TemporaryFolder();
     private static final int FILES_PER_SSTABLE = 8;
@@ -90,8 +93,7 @@ public class StreamSessionTest
             ) throws IOException, ExecutionException, InterruptedException
     {
         SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder.getRoot().toPath());
-        Object[] row = {0, 1, "course", 2};
-        tr.addRow(BigInteger.valueOf(102L), row);
+        tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
         ss.close();  // Force "execution" of futures
@@ -121,8 +123,7 @@ public class StreamSessionTest
     public void testMismatchedTokenRangeFails() throws IOException
     {
         SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder.getRoot().toPath());
-        Object[] row = {0, 1, "course", 2};
-        tr.addRow(BigInteger.valueOf(9999L), row);
+        tr.addRow(BigInteger.valueOf(9999L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         IllegalStateException illegalStateException = 
assertThrows(IllegalStateException.class,
                                                       () -> 
ss.scheduleStream(tr));
@@ -170,8 +171,7 @@ public class StreamSessionTest
     {
         assertThrows(RuntimeException.class, () -> {
             SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
tableWriter.getOutDir());
-            Object[] row = {0, 1, "course", 2};
-            tr.addRow(BigInteger.valueOf(102L), row);
+            tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
             tr.close(writerContext, 1);
             tableWriter.removeOutDir();
             ss.scheduleStream(tr);
@@ -190,8 +190,7 @@ public class StreamSessionTest
         writerContext.setInstancesAreAvailable(false);
         ss = new StreamSession(writerContext, "sessionId", range, executor);
         SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder.getRoot().toPath());
-        Object[] row = {0, 1, "course", 2};
-        tr.addRow(BigInteger.valueOf(102L), row);
+        tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
         assertThrows(LOAD_RANGE_ERROR_PREFIX, RuntimeException.class, () -> 
ss.close());
@@ -234,8 +233,7 @@ public class StreamSessionTest
             }
         });
         SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder.getRoot().toPath());
-        Object[] row = {0, 1, "course", 2};
-        tr.addRow(BigInteger.valueOf(102L), row);
+        tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
         ss.close();  // Force "execution" of futures
@@ -267,8 +265,7 @@ public class StreamSessionTest
             }
         });
         SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder.getRoot().toPath());
-        Object[] row = {0, 1, "course", 2};
-        tr.addRow(BigInteger.valueOf(102L), row);
+        tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
         RuntimeException exception = assertThrows(RuntimeException.class, () 
-> ss.close());  // Force "execution" of futures
@@ -289,8 +286,7 @@ public class StreamSessionTest
     {
         writerContext.setUploadSupplier(instance -> false);
         SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, 
folder.getRoot().toPath());
-        Object[] row = {0, 1, "course", 2};
-        tr.addRow(BigInteger.valueOf(102L), row);
+        tr.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         tr.close(writerContext, 1);
         ss.scheduleStream(tr);
         assertThrows(LOAD_RANGE_ERROR_PREFIX, RuntimeException.class, () -> 
ss.close());
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
index c045492..895ca06 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTest.java
@@ -72,7 +72,44 @@ public class TableSchemaTest
         TableSchema schema = getValidSchemaBuilder()
                 .build();
         assertThat(schema.modificationStatement,
-                   is(equalTo("INSERT INTO test.test (id,date,course,marks) 
VALUES (?,?,?,?);")));
+                   is(equalTo("INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks);")));
+    }
+
+    @Test
+    public void testInsertStatementWithConstantTTL()
+    {
+        TableSchema schema = 
getValidSchemaBuilder().withTTLSetting(TTLOption.from("1000")).build();
+        assertThat(schema.modificationStatement, is(equalTo("INSERT INTO 
test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL 
1000;")));
+    }
+
+    @Test
+    public void testInsertStatementWithTTLColumn()
+    {
+        TableSchema schema = 
getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).build();
+        assertThat(schema.modificationStatement, is(equalTo("INSERT INTO 
test.test (id,date,course,marks) VALUES (:id,:date,:course,:marks) USING TTL 
:ttl;")));
+    }
+
+    @Test
+    public void testInsertStatementWithConstantTimestamp()
+    {
+        TableSchema schema = 
getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("1000")).build();
+        String expectedQuery = "INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks) USING TIMESTAMP 1000;";
+        assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
+    }
+
+    @Test
+    public void testInsertStatementWithTimestampColumn()
+    {
+        TableSchema schema = 
getValidSchemaBuilder().withTimeStampSetting(TimestampOption.from("timestamp")).build();
+        String expectedQuery = "INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp;";
+        assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
+    }
+    @Test
+    public void testInsertStatementWithTTLAndTimestampColumn()
+    {
+        TableSchema schema = 
getValidSchemaBuilder().withTTLSetting(TTLOption.from("ttl")).withTimeStampSetting(TimestampOption.from("timestamp")).build();
+        String expectedQuery = "INSERT INTO test.test (id,date,course,marks) 
VALUES (:id,:date,:course,:marks) USING TIMESTAMP :timestamp AND TTL :ttl;";
+        assertThat(schema.modificationStatement, is(equalTo(expectedQuery)));
     }
 
     @Test
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
index f393f35..18ce2a6 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TableSchemaTestCommon.java
@@ -32,6 +32,7 @@ import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.cassandra.spark.common.schema.ColumnType;
 import org.apache.cassandra.spark.data.CqlField;
+import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 
@@ -151,6 +152,8 @@ public final class TableSchemaTestCommon
         private ColumnType[] partitionKeyColumnTypes;
         private StructType dataFrameSchema;
         private WriteMode writeMode = null;
+        private TTLOption ttlOption = TTLOption.forever();
+        private TimestampOption timestampOption = TimestampOption.now();
 
         public MockTableSchemaBuilder withCqlColumns(@NotNull Map<String, 
CqlField.CqlType> cqlColumns)
         {
@@ -207,6 +210,18 @@ public final class TableSchemaTestCommon
             return this;
         }
 
+        public MockTableSchemaBuilder withTTLSetting(TTLOption ttlOption)
+        {
+            this.ttlOption = ttlOption;
+            return this;
+        }
+
+        public MockTableSchemaBuilder withTimeStampSetting(TimestampOption 
timestampOption)
+        {
+            this.timestampOption = timestampOption;
+            return this;
+        }
+
         public TableSchema build()
         {
             Objects.requireNonNull(cqlColumns,
@@ -228,7 +243,15 @@ public final class TableSchemaTestCommon
                                                                                
 partitionKeyColumnTypes,
                                                                                
 primaryKeyColumnNames,
                                                                                
 cassandraVersion);
-            return new TableSchema(dataFrameSchema, tableInfoProvider, 
writeMode);
+            if (ttlOption.withTTl() && ttlOption.columnName() != null)
+            {
+                dataFrameSchema = dataFrameSchema.add("ttl", 
DataTypes.IntegerType);
+            }
+            if (timestampOption.withTimestamp() && 
timestampOption.columnName() != null)
+            {
+                dataFrameSchema = dataFrameSchema.add("timestamp", 
DataTypes.IntegerType);
+            }
+            return new TableSchema(dataFrameSchema, tableInfoProvider, 
writeMode, ttlOption, timestampOption);
         }
     }
 
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java
index 9ae1d52..9ec752d 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriter.java
@@ -21,8 +21,9 @@ package org.apache.cassandra.bridge;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Map;
 
 public interface SSTableWriter extends Closeable
 {
-    void addRow(Object... values) throws IOException;
+    void addRow(Map<String, Object> values) throws IOException;
 }
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
index c53fc25..05fb8bd 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
@@ -20,6 +20,7 @@
 package org.apache.cassandra.bridge;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.dht.IPartitioner;
@@ -62,7 +63,7 @@ public class SSTableWriterImplementation implements 
SSTableWriter
     }
 
     @Override
-    public void addRow(Object... values) throws IOException
+    public void addRow(Map<String, Object> values)  throws IOException
     {
         try
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to