This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5dcb065  CASSANALYTICS-18: Support CQL duration type (#102)
5dcb065 is described below

commit 5dcb065dc8de0582f6e72b137798f2b883540f73
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Tue Mar 18 18:07:35 2025 +0100

    CASSANALYTICS-18: Support CQL duration type (#102)
    
    Patch by Lukasz Antoniak; Reviewed by Francisco Guerrero, James Berragan, 
Yifan Cai for CASSANALYTICS-18
---
 CHANGES.txt                                        |   1 +
 .../java/org/apache/cassandra/cdc/CdcTests.java    |   7 +
 .../cassandra/cdc/CollectionDeletionTests.java     |   1 +
 .../cassandra/cdc/PartitionDeletionTests.java      |   1 +
 .../apache/cassandra/cdc/RangeDeletionTests.java   |   1 +
 .../cassandra/bridge/type/InternalDuration.java    |  78 +++++++++++
 .../org/apache/cassandra/spark/data/CqlField.java  |  26 +++-
 .../bridge/type/InternalDurationTest.java          |  24 ++--
 .../cassandra/spark/bulkwriter/RecordWriter.java   |   2 +-
 .../spark/bulkwriter/SqlToCqlTypeConverter.java    |  35 ++++-
 .../org/apache/cassandra/spark/EndToEndTests.java  |  17 +++
 .../bulkwriter/SqlToCqlTypeConverterTest.java      |   2 +
 .../spark/reader/DataTypeSerializationTests.java   |  85 +++++++-----
 .../spark/sparksql/SparkRowIteratorTests.java      |   6 +
 .../analytics/BulkWriteDataTypesTest.java          |  18 +++
 .../spark/data/converter/types/SparkDuration.java  |  51 ++++++-
 .../cassandra/spark/utils/SparkTypeUtils.java      |  55 ++++++++
 .../cassandra/spark/utils/SparkTypeUtils.java      |  55 ++++++++
 .../cassandra/spark/utils/SparkTypeUtils.java      |  50 +++++++
 .../cassandra/spark/utils/SparkTypeUtils.java      |  50 +++++++
 .../cassandra/spark/utils/test/TestSchema.java     |   6 +-
 .../spark/data/converter/types/DateTypeTests.java  |   2 +-
 .../cassandra/spark/reader/PartitionKeyTests.java  |  79 ++++++-----
 .../apache/cassandra/spark/data/NativeType.java    |   2 +-
 .../cassandra/spark/data/complex/CqlFrozen.java    |   4 +-
 .../cassandra/spark/data/complex/CqlList.java      |   4 +-
 .../cassandra/spark/data/complex/CqlMap.java       |   6 +-
 .../cassandra/spark/data/complex/CqlSet.java       |   4 +-
 .../cassandra/spark/data/complex/CqlTuple.java     |   2 +-
 .../cassandra/spark/data/complex/CqlUdt.java       |   4 +-
 .../apache/cassandra/spark/data/types/Date.java    |   2 +-
 .../cassandra/spark/data/types/Duration.java       | 150 ++++++++++++++++++++-
 32 files changed, 725 insertions(+), 105 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 086a5e4..3e7e9b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Support CQL duration type (CASSANALYTICS-18)
  * Add Schema Store interfaces for CDC (CASSANALYTICS-8)
  * Add CassandraBridge helper APIs that can be used by external tooling 
(CASSANALYTICS-4)
  * Refactor to decouple RowIterator and CellIterator from Spark so bulk reads 
can be performed outside of Spark (CASSANDRA-20259)
diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java
index 38c59db..28c29c7 100644
--- 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java
@@ -342,6 +342,7 @@ public class CdcTests
     public void testClusteringKey()
     {
         qt().forAll(cql3Type(BRIDGE))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(type ->
                          testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
                                                                
.withPartitionKey("pk", BRIDGE.uuid())
@@ -371,6 +372,9 @@ public class CdcTests
     public void testMultipleClusteringKeys()
     {
         qt().withExamples(50).forAll(cql3Type(BRIDGE), cql3Type(BRIDGE), 
cql3Type(BRIDGE))
+            .assuming((t1, t2, t3) -> t1.supportedAsPrimaryKeyColumn()
+                                      && t2.supportedAsPrimaryKeyColumn()
+                                      && t3.supportedAsPrimaryKeyColumn())
             .checkAssert(
             (t1, t2, t3) ->
             testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
@@ -407,6 +411,7 @@ public class CdcTests
     public void testSet()
     {
         qt().forAll(cql3Type(BRIDGE))
+            .assuming(CqlField.CqlType::supportedAsSetElement)
             .checkAssert(
             t -> testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
                                                        .withPartitionKey("pk", 
BRIDGE.uuid())
@@ -487,6 +492,7 @@ public class CdcTests
     public void testMap()
     {
         qt().withExamples(50).forAll(cql3Type(BRIDGE), cql3Type(BRIDGE))
+            .assuming((t1, t2) -> t1.supportedAsMapKey() && 
t2.supportedAsMapKey())
             .checkAssert(
             (t1, t2) -> testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
                                                               
.withPartitionKey("pk", BRIDGE.uuid())
@@ -766,6 +772,7 @@ public class CdcTests
     public void testCompositePartitionKey()
     {
         qt().forAll(cql3Type(BRIDGE))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(
             type ->
             testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java
 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java
index b736bb1..9f3decb 100644
--- 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java
@@ -98,6 +98,7 @@ public class CollectionDeletionTests
         final long minTimestamp = System.currentTimeMillis();
         final int numRows = 1000;
         qt().forAll(cql3Type(BRIDGE))
+            .assuming(CqlField.CqlType::supportedAsMapKey)
             .checkAssert(
             type -> testWith(BRIDGE, directory, schemaBuilder.apply(type))
                     .withAddLastModificationTime(true)
diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/PartitionDeletionTests.java
 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/PartitionDeletionTests.java
index 52496cc..9ae344d 100644
--- 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/PartitionDeletionTests.java
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/PartitionDeletionTests.java
@@ -105,6 +105,7 @@ public class PartitionDeletionTests
         final long minTimestamp = System.currentTimeMillis();
         final int numRows = 1000;
         qt().forAll(cql3Type(BRIDGE))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(type -> {
                 testWith(BRIDGE, directory, schemaBuilder.apply(type))
                 .withAddLastModificationTime(true)
diff --git 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/RangeDeletionTests.java
 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/RangeDeletionTests.java
index e83a88b..0cd9937 100644
--- 
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/RangeDeletionTests.java
+++ 
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/RangeDeletionTests.java
@@ -110,6 +110,7 @@ public class RangeDeletionTests
         long minTimestamp = System.currentTimeMillis();
         int numRows = 1000;
         qt().forAll(cql3Type(BRIDGE))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(
             type ->
             testWith(BRIDGE, directory, schemaBuilder.apply(type))
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/type/InternalDuration.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/type/InternalDuration.java
new file mode 100644
index 0000000..b2ac781
--- /dev/null
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/type/InternalDuration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge.type;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Analytics-internal Duration type to bridge CQL Duration type.
+ */
+public class InternalDuration implements Serializable
+{
+    private static final long serialVersionUID = -6867844786318937949L;
+    public final int months;
+    public final int days;
+    public final long nanoseconds;
+
+    public InternalDuration(int months, int days, long nanoseconds)
+    {
+        this.months = months;
+        this.days = days;
+        this.nanoseconds = nanoseconds;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        InternalDuration that = (InternalDuration) o;
+        return months == that.months &&
+               days == that.days &&
+               nanoseconds == that.nanoseconds;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(months, days, nanoseconds);
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder builder = new StringBuilder();
+        if (this.months < 0 || this.days < 0 || this.nanoseconds < 0L)
+        {
+            builder.append('-');
+        }
+        builder.append("mo").append(Math.abs(this.months));
+        builder.append("d").append(Math.abs(this.days));
+        builder.append("ns").append(Math.abs(this.nanoseconds));
+        return builder.toString();
+    }
+}
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java
index 05523ce..1c15fad 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java
@@ -109,6 +109,30 @@ public class CqlField implements Serializable, 
Comparable<CqlField>
             return false;
         }
 
+        /**
+         * @return true if type can be part of primary key
+         */
+        default boolean supportedAsPrimaryKeyColumn()
+        {
+            return true;
+        }
+
+        /**
+         * @return true if type can be used as map key
+         */
+        default boolean supportedAsMapKey()
+        {
+            return true;
+        }
+
+        /**
+         * @return true if type can be used as set element
+         */
+        default boolean supportedAsSetElement()
+        {
+            return true;
+        }
+
         default Object deserializeToType(TypeConverter converter, ByteBuffer 
buffer)
         {
             return deserializeToType(converter, buffer, isFrozen());
@@ -153,7 +177,7 @@ public class CqlField implements Serializable, 
Comparable<CqlField>
         Object randomValue(int minCollectionSize);
 
         @VisibleForTesting
-        Object convertForCqlWriter(Object value, CassandraVersion version);
+        Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement);
 
         // Kryo Serialization
 
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
 
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/type/InternalDurationTest.java
similarity index 63%
copy from 
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
copy to 
cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/type/InternalDurationTest.java
index 1e8ca98..d1c1c19 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
+++ 
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/type/InternalDurationTest.java
@@ -17,23 +17,21 @@
  * under the License.
  */
 
-package org.apache.cassandra.spark.data.types;
+package org.apache.cassandra.bridge.type;
 
-import org.apache.cassandra.spark.data.NativeType;
+import org.junit.jupiter.api.Test;
 
-public class Duration extends NativeType
-{
-    public static final Duration INSTANCE = new Duration();
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-    @Override
-    public String name()
+public class InternalDurationTest
+{
+    @Test
+    public void testDurationToString()
     {
-        return "duration";
-    }
+        InternalDuration duration = new InternalDuration(1, 2, 123456789123L);
+        assertEquals("mo1d2ns123456789123", duration.toString());
 
-    @Override
-    public boolean isSupported()
-    {
-        return false;
+        duration = new InternalDuration(0, -3, 987L);
+        assertEquals("-mo0d3ns987", duration.toString());
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 83832fd..633e7ef 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -400,7 +400,7 @@ public class RecordWriter
                     udtValue.udtMap.put(entry.getKey(), 
maybeConvertUdt(entry.getValue()));
                 }
             }
-            return getUdt(udtValue.name).convertForCqlWriter(udtValue.udtMap, 
writerContext.bridge().getVersion());
+            return getUdt(udtValue.name).convertForCqlWriter(udtValue.udtMap, 
writerContext.bridge().getVersion(), false);
         }
         return value;
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
index d148de7..b7b8392 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
@@ -41,10 +41,13 @@ import com.google.common.net.InetAddresses;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.bridge.type.InternalDuration;
 import org.apache.cassandra.spark.data.BridgeUdtValue;
 import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.SparkTypeUtils;
 import org.apache.cassandra.spark.utils.UUIDs;
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.unsafe.types.CalendarInterval;
 import scala.Tuple2;
 
 import static 
org.apache.cassandra.spark.utils.ScalaConversionUtils.asJavaIterable;
@@ -61,6 +64,7 @@ public final class SqlToCqlTypeConverter implements 
Serializable
     public static final String DATE = "date";
     public static final String DECIMAL = "decimal";
     public static final String DOUBLE = "double";
+    public static final String DURATION = "duration";
     public static final String FLOAT = "float";
     public static final String FROZEN = "frozen";
     public static final String INET = "inet";
@@ -94,6 +98,7 @@ public final class SqlToCqlTypeConverter implements 
Serializable
     private static final TimeUUIDConverter TIME_UUID_CONVERTER = new 
TimeUUIDConverter();
     private static final InetAddressConverter INET_ADDRESS_CONVERTER = new 
InetAddressConverter();
     public static final DateConverter DATE_CONVERTER = new DateConverter();
+    public static final DurationConverter DURATION_CONVERTER = new 
DurationConverter();
 
     private SqlToCqlTypeConverter()
     {
@@ -127,6 +132,8 @@ public final class SqlToCqlTypeConverter implements 
Serializable
                 return BIG_DECIMAL_CONVERTER;
             case DOUBLE:
                 return NO_OP_CONVERTER;
+            case DURATION:
+                return DURATION_CONVERTER;
             case FLOAT:
                 return NO_OP_CONVERTER;
             case FROZEN:
@@ -213,7 +220,7 @@ public final class SqlToCqlTypeConverter implements 
Serializable
         }
     }
 
-    abstract static class Converter<T> implements Serializable
+    public abstract static class Converter<T> implements Serializable
     {
         public T convert(Object object)
         {
@@ -547,6 +554,32 @@ public final class SqlToCqlTypeConverter implements 
Serializable
         }
     }
 
+    public static class DurationConverter extends 
NullableConverter<InternalDuration>
+    {
+        @Override
+        public InternalDuration convertInternal(Object object)
+        {
+            // TODO: Support conversion from ISO8601 duration representation 
and standard form (check Java driver).
+            // Currently Spark does not support persisting CalendarInterval, 
so this is optional.
+            // Test simpleDurationSchemaSetup() shall fail when Sparks 
supports it.
+            if (object instanceof CalendarInterval)
+            {
+                CalendarInterval cl = (CalendarInterval) object;
+                return SparkTypeUtils.convertDuration(cl);
+            }
+            else
+            {
+                throw new RuntimeException("Unsupported conversion for 
DURATION from " + object.getClass().getTypeName());
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Duration";
+        }
+    }
+
     public static class TimeConverter extends NullableConverter<Long>
     {
         @Override
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
index 232ecb7..8195dad 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
@@ -155,6 +155,7 @@ public class EndToEndTests
     public void testSingleClusteringKeyOrderBy(CassandraBridge bridge)
     {
         qt().forAll(TestUtils.cql3Type(bridge), TestUtils.sortOrder())
+            .assuming((clusteringKeyType, sortOrder) -> 
clusteringKeyType.supportedAsPrimaryKeyColumn())
             .checkAssert((clusteringKeyType, sortOrder) ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("a", 
bridge.bigint())
@@ -206,6 +207,7 @@ public class EndToEndTests
     {
         // Test partition key can be read for all data types
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(partitionKeyType -> {
                 // Boolean or empty types have limited cardinality
                 int numRows = partitionKeyType.cardinality(10);
@@ -885,6 +887,7 @@ public class EndToEndTests
     public void testSet(CassandraBridge bridge)
     {
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsSetElement)
             .checkAssert(type ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -914,6 +917,7 @@ public class EndToEndTests
     {
         qt().withExamples(50)  // Limit number of tests otherwise n x n tests 
takes too long
             .forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+            .assuming((keyType, valueType) -> keyType.supportedAsMapKey())
             .checkAssert((keyType, valueType) ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -943,6 +947,7 @@ public class EndToEndTests
     {
         // pk -> a frozen<set<?>>
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsSetElement)
             .checkAssert(type ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -974,6 +979,7 @@ public class EndToEndTests
         // pk -> a frozen<map<?, ?>>
         qt().withExamples(50)  // Limit number of tests otherwise n x n tests 
takes too long
             .forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+            .assuming((keyType, valueType) -> keyType.supportedAsMapKey())
             .checkAssert((keyType, valueType) ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -1178,6 +1184,7 @@ public class EndToEndTests
     {
         // pk -> a testudt<b text, c frozen<type>, d int>
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsSetElement)
             .checkAssert(type ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -1215,6 +1222,7 @@ public class EndToEndTests
         // pk -> a testudt<b float, c frozen<set<uuid>>, d frozen<map<type1, 
type2>>, e boolean>
         qt().withExamples(50)
             .forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+            .assuming((type1, type2) -> type1.supportedAsMapKey())
             .checkAssert((type1, type2) ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -1235,6 +1243,7 @@ public class EndToEndTests
         // pk -> col1 udt1<a float, b frozen<set<uuid>>, c frozen<set<type>>, 
d boolean>,
         //       col2 udt2<a text, b bigint, g varchar>, col3 udt3<int, type, 
ascii>
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsSetElement)
             .checkAssert(type ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -1305,6 +1314,7 @@ public class EndToEndTests
         // pk -> col1 type1 -> a tuple<int, type2, bigint>
         qt().withExamples(10)
             .forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+            .assuming((type1, type2) -> type1.supportedAsPrimaryKeyColumn())
             .checkAssert((type1, type2) ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -1346,6 +1356,7 @@ public class EndToEndTests
         // Test set nested within tuple
         qt().withExamples(10)
             .forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsSetElement)
             .checkAssert(type ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -1390,6 +1401,7 @@ public class EndToEndTests
         // Test map nested within tuple
         qt().withExamples(10)
             .forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+            .assuming((type1, type2) -> type1.supportedAsMapKey())
             .checkAssert((type1, type2) ->
                 Tester.builder(TestSchema.builder(bridge)
                                         .withPartitionKey("pk", bridge.uuid())
@@ -1431,6 +1443,7 @@ public class EndToEndTests
         // Test tuple nested within set
         qt().withExamples(10)
             .forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsSetElement)
             .checkAssert(type ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -1510,6 +1523,7 @@ public class EndToEndTests
     public void testTupleClusteringKey(CassandraBridge bridge)
     {
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(type ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -1529,6 +1543,7 @@ public class EndToEndTests
     public void testUdtClusteringKey(CassandraBridge bridge)
     {
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(type ->
                 Tester.builder(TestSchema.builder(bridge)
                                          .withPartitionKey("pk", bridge.uuid())
@@ -2564,6 +2579,7 @@ public class EndToEndTests
     public void 
testSinglePartitionAndClusteringKeyWithNullValueColumn(CassandraBridge bridge)
     {
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(clusteringKeyType ->
                          
Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", bridge.bigint())
                                                   .withClusteringKey("b", 
clusteringKeyType)
@@ -2578,6 +2594,7 @@ public class EndToEndTests
     public void testMultipleValueColumnsWithNullValueColumn(CassandraBridge 
bridge)
     {
         qt().forAll(TestUtils.cql3Type(bridge))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
             .checkAssert(clusteringKeyType ->
                          
Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", bridge.bigint())
                                                   .withClusteringKey("b", 
clusteringKeyType)
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java
index c72bddd..dab3f79 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java
@@ -35,6 +35,7 @@ import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.COUNTE
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
 import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DECIMAL;
 import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DOUBLE;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DURATION;
 import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.FLOAT;
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INET;
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
@@ -72,6 +73,7 @@ public final class SqlToCqlTypeConverterTest
                              na(mockCqlType(TIMEUUID), 
SqlToCqlTypeConverter.TimeUUIDConverter.class),
                              na(mockCqlType(INET), 
SqlToCqlTypeConverter.InetAddressConverter.class),
                              na(mockCqlType(DATE), 
SqlToCqlTypeConverter.DateConverter.class),
+                             na(mockCqlType(DURATION), 
SqlToCqlTypeConverter.DurationConverter.class),
                              na(mockMapCqlType(INT, INT), 
SqlToCqlTypeConverter.MapConverter.class),
                              na(mockSetCqlType(INET), 
SqlToCqlTypeConverter.SetConverter.class),
                              na(mockUdtCqlType("udtType", "f1", TEXT, "f2", 
INT, "f3", TIMEUUID), SqlToCqlTypeConverter.UdtConverter.class),
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java
index cb81db3..6a91bf2 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java
@@ -39,14 +39,18 @@ import java.util.stream.IntStream;
 import org.junit.jupiter.api.Test;
 
 import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.type.InternalDuration;
 import org.apache.cassandra.spark.TestUtils;
+import org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.converter.types.SparkType;
 import org.apache.cassandra.spark.utils.RandomUtils;
+import org.apache.cassandra.spark.utils.SparkTypeUtils;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import static org.apache.cassandra.bridge.CassandraBridgeFactory.getSparkSql;
@@ -294,6 +298,18 @@ public class DataTypeSerializationTests
         });
     }
 
+    @Test
+    public void testDuration()
+    {
+        qt().forAll(TestUtils.bridges()).checkAssert(bridge -> {
+            CalendarInterval value = SparkTypeUtils.convertDuration(new 
InternalDuration(1, 2, 7000000000L));
+            Object converted = 
SqlToCqlTypeConverter.getConverter(bridge.duration()).convert(value);
+            Object deserialized = toDuration(bridge, converted);
+            assertInstanceOf(CalendarInterval.class, deserialized);
+            assertEquals(value, deserialized);
+        });
+    }
+
     @Test
     public void testTimestamp()
     {
@@ -425,7 +441,7 @@ public class DataTypeSerializationTests
     public void testSet()
     {
         runTest((partitioner, directory, bridge) ->
-                qt().forAll(TestUtils.cql3Type(bridge)).checkAssert(type -> {
+                
qt().forAll(TestUtils.cql3Type(bridge)).assuming(CqlField.CqlType::supportedAsSetElement).checkAssert(type
 -> {
                     CqlField.CqlSet set = bridge.set(type);
                     SparkType sparkType = 
getSparkSql(bridge).toSparkType(type);
                     Set<Object> expected = IntStream.range(0, 128)
@@ -445,38 +461,40 @@ public class DataTypeSerializationTests
     public void testMap()
     {
         runTest((partitioner, directory, bridge) ->
-                qt().forAll(TestUtils.cql3Type(bridge), 
TestUtils.cql3Type(bridge)).checkAssert((keyType, valueType) -> {
-                    CqlField.CqlMap map = bridge.map(keyType, valueType);
-                    SparkType keySparkType = 
getSparkSql(bridge).toSparkType(keyType);
-                    SparkType valueSparkType = 
getSparkSql(bridge).toSparkType(valueType);
-
-                    int count = keyType.cardinality(128);
-                    Map<Object, Object> expected = new HashMap<>(count);
-                    for (int entry = 0; entry < count; entry++)
-                    {
-                        Object key = null;
-                        while (key == null || expected.containsKey(key))
+                qt().forAll(TestUtils.cql3Type(bridge), 
TestUtils.cql3Type(bridge))
+                    .assuming((keyType, valueType) -> 
keyType.supportedAsMapKey())
+                    .checkAssert((keyType, valueType) -> {
+                        CqlField.CqlMap map = bridge.map(keyType, valueType);
+                        SparkType keySparkType = 
getSparkSql(bridge).toSparkType(keyType);
+                        SparkType valueSparkType = 
getSparkSql(bridge).toSparkType(valueType);
+
+                        int count = keyType.cardinality(128);
+                        Map<Object, Object> expected = new HashMap<>(count);
+                        for (int entry = 0; entry < count; entry++)
                         {
-                            key = keyType.randomValue();
+                            Object key = null;
+                            while (key == null || expected.containsKey(key))
+                            {
+                                key = keyType.randomValue();
+                            }
+                            expected.put(key, valueType.randomValue());
+                        }
+                        ByteBuffer buffer = map.serialize(expected);
+                        ArrayBasedMapData mapData = ((ArrayBasedMapData) 
map.deserializeToType(getSparkSql(bridge), buffer));
+                        ArrayData keys = mapData.keyArray();
+                        ArrayData values = mapData.valueArray();
+                        Map<Object, Object> actual = new 
HashMap<>(keys.numElements());
+                        for (int index = 0; index < keys.numElements(); 
index++)
+                        {
+                            Object key = 
keySparkType.toTestRowType(keys.get(index, 
getSparkSql(bridge).sparkSqlType(keyType)));
+                            Object value = 
valueSparkType.toTestRowType(values.get(index, 
getSparkSql(bridge).sparkSqlType(valueType)));
+                            actual.put(key, value);
+                        }
+                        assertEquals(expected.size(), actual.size());
+                        for (Map.Entry<Object, Object> entry : 
expected.entrySet())
+                        {
+                            assertEquals(entry.getValue(), 
actual.get(entry.getKey()));
                         }
-                        expected.put(key, valueType.randomValue());
-                    }
-                    ByteBuffer buffer = map.serialize(expected);
-                    ArrayBasedMapData mapData = ((ArrayBasedMapData) 
map.deserializeToType(getSparkSql(bridge), buffer));
-                    ArrayData keys = mapData.keyArray();
-                    ArrayData values = mapData.valueArray();
-                    Map<Object, Object> actual = new 
HashMap<>(keys.numElements());
-                    for (int index = 0; index < keys.numElements(); index++)
-                    {
-                        Object key = 
keySparkType.toTestRowType(keys.get(index, 
getSparkSql(bridge).sparkSqlType(keyType)));
-                        Object value = 
valueSparkType.toTestRowType(values.get(index, 
getSparkSql(bridge).sparkSqlType(valueType)));
-                        actual.put(key, value);
-                    }
-                    assertEquals(expected.size(), actual.size());
-                    for (Map.Entry<Object, Object> entry : expected.entrySet())
-                    {
-                        assertEquals(entry.getValue(), 
actual.get(entry.getKey()));
-                    }
                 }));
     }
 
@@ -617,6 +635,11 @@ public class DataTypeSerializationTests
         return toNative(bridge, CassandraBridge::time, value);
     }
 
+    static Object toDuration(CassandraBridge bridge, Object value)
+    {
+        return toNative(bridge, CassandraBridge::duration, value);
+    }
+
     static Object toTimestamp(CassandraBridge bridge, Object value)
     {
         return toNative(bridge, CassandraBridge::timestamp, value);
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
index 22101e0..7df0cb8 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
@@ -63,6 +63,7 @@ public class SparkRowIteratorTests
     {
         // I.e. "create table keyspace.table (a %s, b %s, primary key(a));"
         qt().forAll(TestUtils.versions(), TestUtils.cql3Type(bridge), 
TestUtils.cql3Type(bridge))
+            .assuming((version, type1, type2) -> 
type1.supportedAsPrimaryKeyColumn())
             .checkAssert((version, type1, type2) -> runTest(version, 
TestSchema.builder(bridge)
                     .withPartitionKey("a", type1)
                     .withColumn("b", type2)
@@ -74,6 +75,9 @@ public class SparkRowIteratorTests
     public void testMultiPartitionKeys(CassandraBridge bridge)
     {
         qt().forAll(TestUtils.versions(), TestUtils.cql3Type(bridge), 
TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+            .assuming((version, type1, type2, type3) -> 
type1.supportedAsPrimaryKeyColumn()
+                                                        && 
type2.supportedAsPrimaryKeyColumn()
+                                                        && 
type3.supportedAsPrimaryKeyColumn())
             .checkAssert((version, type1, type2, type3) -> runTest(version, 
TestSchema.builder(bridge)
                     .withPartitionKey("a", type1)
                     .withPartitionKey("b", type2)
@@ -89,6 +93,7 @@ public class SparkRowIteratorTests
         for (CassandraVersion version : TestUtils.testableVersions())
         {
             qt().forAll(TestUtils.cql3Type(bridge), 
TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge), TestUtils.sortOrder())
+                .assuming((type1, type2, type3, order) -> 
type1.supportedAsPrimaryKeyColumn() && type2.supportedAsPrimaryKeyColumn())
                 .checkAssert((type1, type2, type3, order) -> runTest(version, 
TestSchema.builder(bridge)
                         .withPartitionKey("a", type1)
                         .withClusteringKey("b", type2)
@@ -105,6 +110,7 @@ public class SparkRowIteratorTests
         for (CassandraVersion version : TestUtils.testableVersions())
         {
             qt().forAll(TestUtils.cql3Type(bridge), 
TestUtils.cql3Type(bridge), TestUtils.sortOrder(), TestUtils.sortOrder())
+                .assuming((type1, type2, order1, order2) -> 
type1.supportedAsPrimaryKeyColumn() && type2.supportedAsPrimaryKeyColumn())
                 .checkAssert((type1, type2, order1, order2) -> 
runTest(version, TestSchema.builder(bridge)
                         .withPartitionKey("a", bridge.bigint())
                         .withClusteringKey("b", type1)
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
index c63accd..3492879 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
@@ -43,10 +43,12 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.bridge.type.InternalDuration;
 import org.apache.cassandra.sidecar.testing.QualifiedName;
 import org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.ScalaConversionUtils;
+import org.apache.cassandra.spark.utils.SparkTypeUtils;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -58,6 +60,7 @@ import scala.collection.mutable.Seq;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
 import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
 import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.CalendarIntervalType;
 import static org.apache.spark.sql.types.DataTypes.DateType;
 import static org.apache.spark.sql.types.DataTypes.IntegerType;
 import static org.apache.spark.sql.types.DataTypes.LongType;
@@ -147,6 +150,9 @@ class BulkWriteDataTypesTest extends 
SharedClusterSparkIntegrationTestBase
         // Simple schema with Date as column.
         types.add(simpleDateSchemaSetup());
 
+        // Simple schema with Duration as column.
+        types.add(simpleDurationSchemaSetup());
+
         // Simple schema with integers and strings as columns.
         types.add(integersAndStringsSchemaSetup());
 
@@ -214,6 +220,16 @@ class BulkWriteDataTypesTest extends 
SharedClusterSparkIntegrationTestBase
         return setup;
     }
 
+    static TypeTestSetup simpleDurationSchemaSetup()
+    {
+        return new TypeTestSetup("duration_schema",
+                                 Arrays.asList("id", "took"),
+                                 Arrays.asList(IntegerType, 
CalendarIntervalType),
+                                 Arrays.asList(INTEGER_MAPPER, 
DURATION_MAPPER),
+                                 "CREATE TABLE %s (id int, took duration, 
PRIMARY KEY (id))",
+                                 "Cannot save interval data type into external 
storage.");
+    }
+
     static TypeTestSetup integersAndStringsSchemaSetup()
     {
         return new TypeTestSetup("simple_schema",
@@ -441,6 +457,8 @@ class BulkWriteDataTypesTest extends 
SharedClusterSparkIntegrationTestBase
     = recordNumber -> Timestamp.from(new 
Date(1731457509115L).toInstant().plus(recordNumber, ChronoUnit.SECONDS));
     static final Function<Integer, Object> DATE_MAPPER
     = recordNumber -> java.sql.Date.valueOf(((Timestamp) 
TIMESTAMP_MAPPER.apply(recordNumber)).toLocalDateTime().toLocalDate());
+    static final Function<Integer, Object> DURATION_MAPPER
+    = recordNumber -> SparkTypeUtils.convertDuration(new InternalDuration(1, 
recordNumber, recordNumber * 1000000000));
 
     static class TypeTestSetup
     {
diff --git 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/SparkDuration.java
 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/SparkDuration.java
index 6cba57c..e976e5f 100644
--- 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/SparkDuration.java
+++ 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/SparkDuration.java
@@ -19,12 +19,59 @@
 
 package org.apache.cassandra.spark.data.converter.types;
 
-public class SparkDuration implements NotImplementedFeatures
+import java.util.Comparator;
+
+import org.apache.cassandra.bridge.BigNumberConfig;
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.cassandra.spark.utils.SparkTypeUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.jetbrains.annotations.NotNull;
+
+public class SparkDuration implements SparkType
 {
     public static final SparkDuration INSTANCE = new SparkDuration();
+    private static final Comparator<CalendarInterval> 
CALENDAR_INTERVAL_COMPARATOR =
+    SparkTypeUtils.CALENDAR_INTERVAL_COMPARATOR;
+
+    @Override
+    public DataType dataType(BigNumberConfig bigNumberConfig)
+    {
+        return DataTypes.CalendarIntervalType;
+    }
+
+    @Override
+    public Object nativeSparkSqlRowValue(Row row, int position)
+    {
+        return row.isNullAt(position) ? null : row.get(position); // should 
return CalendarInterval type
+    }
 
-    private SparkDuration()
+    @Override
+    public Object nativeSparkSqlRowValue(final GenericInternalRow row, final 
int position)
     {
+        return row.isNullAt(position) ? null : row.getInterval(position);
+    }
+
+    @Override
+    public Object toSparkSqlType(@NotNull Object value, boolean isFrozen)
+    {
+        InternalDuration duration = (InternalDuration) value;
+        return SparkTypeUtils.convertDuration(duration);
+    }
 
+    @Override
+    public Object toTestRowType(Object value)
+    {
+        CalendarInterval ci = (CalendarInterval) value;
+        return SparkTypeUtils.convertDuration(ci);
+    }
+
+    @Override
+    public int compareTo(Object first, Object second)
+    {
+        return CALENDAR_INTERVAL_COMPARATOR.compare((CalendarInterval) first, 
(CalendarInterval) second);
     }
 }
diff --git 
a/cassandra-analytics-spark-converter/src/main/scala-2.11-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
 
b/cassandra-analytics-spark-converter/src/main/scala-2.11-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
new file mode 100644
index 0000000..4df97e3
--- /dev/null
+++ 
b/cassandra-analytics-spark-converter/src/main/scala-2.11-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.spark.unsafe.types.CalendarInterval;
+
+public final class SparkTypeUtils
+{
+    private SparkTypeUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    public static final Comparator<CalendarInterval> 
CALENDAR_INTERVAL_COMPARATOR =
+    Comparator.<CalendarInterval>comparingInt(interval -> interval.months)
+              .thenComparingLong(interval -> interval.microseconds);
+
+    public static CalendarInterval convertDuration(InternalDuration duration)
+    {
+        // Unfortunately, it loses precision when converting to the spark data 
type.
+        long micros = TimeUnit.NANOSECONDS.toMicros(duration.nanoseconds);
+        micros += duration.days * CalendarInterval.MICROS_PER_DAY;
+        return new CalendarInterval(duration.months, micros);
+    }
+
+    public static InternalDuration convertDuration(CalendarInterval interval)
+    {
+        int days = Ints.checkedCast(interval.microseconds / 
CalendarInterval.MICROS_PER_DAY);
+        long microsRemain = interval.microseconds % 
CalendarInterval.MICROS_PER_DAY;
+        return new InternalDuration(interval.months, days, 
TimeUnit.MICROSECONDS.toNanos(microsRemain));
+    }
+}
diff --git 
a/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
 
b/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
new file mode 100644
index 0000000..4df97e3
--- /dev/null
+++ 
b/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.spark.unsafe.types.CalendarInterval;
+
+public final class SparkTypeUtils
+{
+    private SparkTypeUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    public static final Comparator<CalendarInterval> 
CALENDAR_INTERVAL_COMPARATOR =
+    Comparator.<CalendarInterval>comparingInt(interval -> interval.months)
+              .thenComparingLong(interval -> interval.microseconds);
+
+    public static CalendarInterval convertDuration(InternalDuration duration)
+    {
+        // Unfortunately, it loses precision when converting to the spark data 
type.
+        long micros = TimeUnit.NANOSECONDS.toMicros(duration.nanoseconds);
+        micros += duration.days * CalendarInterval.MICROS_PER_DAY;
+        return new CalendarInterval(duration.months, micros);
+    }
+
+    public static InternalDuration convertDuration(CalendarInterval interval)
+    {
+        int days = Ints.checkedCast(interval.microseconds / 
CalendarInterval.MICROS_PER_DAY);
+        long microsRemain = interval.microseconds % 
CalendarInterval.MICROS_PER_DAY;
+        return new InternalDuration(interval.months, days, 
TimeUnit.MICROSECONDS.toNanos(microsRemain));
+    }
+}
diff --git 
a/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
 
b/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
new file mode 100644
index 0000000..4d200b3
--- /dev/null
+++ 
b/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.spark.unsafe.types.CalendarInterval;
+
+public final class SparkTypeUtils
+{
+    private SparkTypeUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    public static final Comparator<CalendarInterval> 
CALENDAR_INTERVAL_COMPARATOR =
+    Comparator.<CalendarInterval>comparingInt(interval -> interval.months)
+              .thenComparingInt(interval -> interval.days)
+              .thenComparingLong(interval -> interval.microseconds);
+
+    public static CalendarInterval convertDuration(InternalDuration duration)
+    {
+        // Unfortunately, it loses precision when converting to the spark data 
type.
+        return new CalendarInterval(duration.months, duration.days, 
TimeUnit.NANOSECONDS.toMicros(duration.nanoseconds));
+    }
+
+    public static InternalDuration convertDuration(CalendarInterval interval)
+    {
+        return new InternalDuration(interval.months, interval.days, 
TimeUnit.MICROSECONDS.toNanos(interval.microseconds));
+    }
+}
diff --git 
a/cassandra-analytics-spark-converter/src/main/scala-2.13-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
 
b/cassandra-analytics-spark-converter/src/main/scala-2.13-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
new file mode 100644
index 0000000..4d200b3
--- /dev/null
+++ 
b/cassandra-analytics-spark-converter/src/main/scala-2.13-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.spark.unsafe.types.CalendarInterval;
+
+public final class SparkTypeUtils
+{
+    private SparkTypeUtils()
+    {
+        throw new IllegalStateException(getClass() + " is static utility class 
and shall not be instantiated");
+    }
+
+    public static final Comparator<CalendarInterval> 
CALENDAR_INTERVAL_COMPARATOR =
+    Comparator.<CalendarInterval>comparingInt(interval -> interval.months)
+              .thenComparingInt(interval -> interval.days)
+              .thenComparingLong(interval -> interval.microseconds);
+
+    public static CalendarInterval convertDuration(InternalDuration duration)
+    {
+        // Unfortunately, it loses precision when converting to the spark data 
type.
+        return new CalendarInterval(duration.months, duration.days, 
TimeUnit.NANOSECONDS.toMicros(duration.nanoseconds));
+    }
+
+    public static InternalDuration convertDuration(CalendarInterval interval)
+    {
+        return new InternalDuration(interval.months, interval.days, 
TimeUnit.MICROSECONDS.toNanos(interval.microseconds));
+    }
+}
diff --git 
a/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java
 
b/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java
index 46e7a7d..b667185 100644
--- 
a/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java
+++ 
b/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java
@@ -754,14 +754,14 @@ public final class TestSchema
             Object[] result = new Object[end - start];
             for (int sourceIndex = start, destinationIndex = 0; sourceIndex < 
end; sourceIndex++, destinationIndex++)
             {
-                result[destinationIndex] = 
convertForCqlWriter(getType(sourceIndex), values[sourceIndex]);
+                result[destinationIndex] = 
convertForCqlWriter(getType(sourceIndex), values[sourceIndex], false);
             }
             return result;
         }
 
-        private Object convertForCqlWriter(CqlField.CqlType type, Object value)
+        private Object convertForCqlWriter(CqlField.CqlType type, Object 
value, boolean isCollectionElement)
         {
-            return type.convertForCqlWriter(value, version);
+            return type.convertForCqlWriter(value, version, 
isCollectionElement);
         }
 
         public CqlField.CqlType getType(int position)
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/DateTypeTests.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/DateTypeTests.java
index 73de41f..d4e72a0 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/DateTypeTests.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/DateTypeTests.java
@@ -49,7 +49,7 @@ public class DateTypeTests
         assertEquals(2021, end.getYear());
         assertEquals(7, end.getMonthValue());
         assertEquals(16, end.getDayOfMonth());
-        Object cqlWriterObj = Date.INSTANCE.convertForCqlWriter(numDays, 
BRIDGE.getVersion());
+        Object cqlWriterObj = Date.INSTANCE.convertForCqlWriter(numDays, 
BRIDGE.getVersion(), false);
         org.apache.cassandra.cql3.functions.types.LocalDate cqlWriterDate = 
(org.apache.cassandra.cql3.functions.types.LocalDate) cqlWriterObj;
         assertEquals(2021, cqlWriterDate.getYear());
         assertEquals(7, cqlWriterDate.getMonth());
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/PartitionKeyTests.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/PartitionKeyTests.java
index 9a1aabe..2373434 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/PartitionKeyTests.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/PartitionKeyTests.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
 import org.apache.cassandra.bridge.CassandraBridgeImplementation;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.CqlType;
 import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter;
@@ -49,24 +50,26 @@ public class PartitionKeyTests
     @SuppressWarnings("static-access")
     public void testBuildPartitionKey()
     {
-        
qt().forAll(arbitrary().pick(BRIDGE.supportedTypes())).checkAssert(partitionKeyType
 -> {
-            CqlTable table = TestSchema.builder(BRIDGE)
-                                       .withPartitionKey("a", partitionKeyType)
-                                       .withClusteringKey("b", BRIDGE.aInt())
-                                       .withColumn("c", BRIDGE.aInt())
-                                       .build()
-                                       .buildTable();
-            Object value = partitionKeyType.randomValue(100);
-            String string = ((CqlType) 
partitionKeyType).serializer().toString(value);
-            ByteBuffer buffer = BRIDGE.buildPartitionKey(table, 
Collections.singletonList(string));
-            Object cassandraValue = 
partitionKeyType.deserializeToJavaType(buffer);
+        qt().forAll(arbitrary().pick(BRIDGE.supportedTypes()))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
+            .checkAssert(partitionKeyType -> {
+                CqlTable table = TestSchema.builder(BRIDGE)
+                                           .withPartitionKey("a", 
partitionKeyType)
+                                           .withClusteringKey("b", 
BRIDGE.aInt())
+                                           .withColumn("c", BRIDGE.aInt())
+                                           .build()
+                                           .buildTable();
+                Object value = partitionKeyType.randomValue(100);
+                String string = ((CqlType) 
partitionKeyType).serializer().toString(value);
+                ByteBuffer buffer = BRIDGE.buildPartitionKey(table, 
Collections.singletonList(string));
+                Object cassandraValue = 
partitionKeyType.deserializeToJavaType(buffer);
 
-            // compare using Cassandra types
-            assertTrue(ComparisonUtils.equals(value, cassandraValue));
+                // compare using Cassandra types
+                assertTrue(ComparisonUtils.equals(value, cassandraValue));
 
-            // convert SparkSQL types back into test row types to compare
-            Object sparkSqlValue = TYPE_CONVERTER.convert(partitionKeyType, 
cassandraValue, false);
-            assertTrue(ComparisonUtils.equals(value, 
TYPE_CONVERTER.toTestRowType(partitionKeyType, sparkSqlValue)));
+                // convert SparkSQL types back into test row types to compare
+                Object sparkSqlValue = 
TYPE_CONVERTER.convert(partitionKeyType, cassandraValue, false);
+                assertTrue(ComparisonUtils.equals(value, 
TYPE_CONVERTER.toTestRowType(partitionKeyType, sparkSqlValue)));
         });
     }
 
@@ -74,30 +77,32 @@ public class PartitionKeyTests
     @SuppressWarnings("static-access")
     public void testBuildCompositePartitionKey()
     {
-        
qt().forAll(arbitrary().pick(BRIDGE.supportedTypes())).checkAssert(partitionKeyType
 -> {
-            CqlTable table = TestSchema.builder(BRIDGE)
-                                       .withPartitionKey("a", BRIDGE.aInt())
-                                       .withPartitionKey("b", partitionKeyType)
-                                       .withPartitionKey("c", BRIDGE.text())
-                                       .withClusteringKey("d", BRIDGE.aInt())
-                                       .withColumn("e", BRIDGE.aInt())
-                                       .build()
-                                       .buildTable();
-            List<AbstractType<?>> partitionKeyColumnTypes = 
BRIDGE.partitionKeyColumnTypes(table);
-            CompositeType compositeType = 
CompositeType.getInstance(partitionKeyColumnTypes);
+        qt().forAll(arbitrary().pick(BRIDGE.supportedTypes()))
+            .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
+            .checkAssert(partitionKeyType -> {
+                CqlTable table = TestSchema.builder(BRIDGE)
+                                           .withPartitionKey("a", 
BRIDGE.aInt())
+                                           .withPartitionKey("b", 
partitionKeyType)
+                                           .withPartitionKey("c", 
BRIDGE.text())
+                                           .withClusteringKey("d", 
BRIDGE.aInt())
+                                           .withColumn("e", BRIDGE.aInt())
+                                           .build()
+                                           .buildTable();
+                List<AbstractType<?>> partitionKeyColumnTypes = 
BRIDGE.partitionKeyColumnTypes(table);
+                CompositeType compositeType = 
CompositeType.getInstance(partitionKeyColumnTypes);
 
-            int columnA = (int) BRIDGE.aInt().randomValue(1024);
-            Object columnB = partitionKeyType.randomValue(1024);
-            String columnBString = ((CqlType) 
partitionKeyType).serializer().toString(columnB);
-            String columnC = (String) BRIDGE.text().randomValue(1024);
+                int columnA = (int) BRIDGE.aInt().randomValue(1024);
+                Object columnB = partitionKeyType.randomValue(1024);
+                String columnBString = ((CqlType) 
partitionKeyType).serializer().toString(columnB);
+                String columnC = (String) BRIDGE.text().randomValue(1024);
 
-            ByteBuffer buffer = BRIDGE.buildPartitionKey(table, 
Arrays.asList(Integer.toString(columnA), columnBString, columnC));
-            ByteBuffer[] buffers = compositeType.split(buffer);
-            assertEquals(3, buffers.length);
+                ByteBuffer buffer = BRIDGE.buildPartitionKey(table, 
Arrays.asList(Integer.toString(columnA), columnBString, columnC));
+                ByteBuffer[] buffers = compositeType.split(buffer);
+                assertEquals(3, buffers.length);
 
-            assertEquals(columnA, buffers[0].getInt());
-            assertEquals(columnB, 
partitionKeyType.deserializeToJavaType(buffers[1]));
-            assertEquals(columnC, 
TYPE_CONVERTER.toSparkType(BRIDGE.text()).toTestRowType(BRIDGE.text().deserializeToJavaType(buffers[2])));
+                assertEquals(columnA, buffers[0].getInt());
+                assertEquals(columnB, 
partitionKeyType.deserializeToJavaType(buffers[1]));
+                assertEquals(columnC, 
TYPE_CONVERTER.toSparkType(BRIDGE.text()).toTestRowType(BRIDGE.text().deserializeToJavaType(buffers[2])));
         });
     }
 }
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
index 8c59e64..a3117f4 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
@@ -54,7 +54,7 @@ public abstract class NativeType extends CqlType implements 
CqlField.NativeType
     }
 
     @Override
-    public Object convertForCqlWriter(Object value, CassandraVersion version)
+    public Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement)
     {
         return value;
     }
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
index fd7928f..dd14ee3 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
@@ -129,9 +129,9 @@ public class CqlFrozen extends CqlType implements 
CqlField.CqlFrozen
     }
 
     @Override
-    public Object convertForCqlWriter(Object value, CassandraVersion version)
+    public Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement)
     {
-        return inner.convertForCqlWriter(value, version);
+        return inner.convertForCqlWriter(value, version, false);
     }
 
     @Override
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
index 6a47000..f04c1c2 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
@@ -93,10 +93,10 @@ public class CqlList extends CqlCollection implements 
CqlField.CqlList
     }
 
     @Override
-    public Object convertForCqlWriter(Object value, CassandraVersion version)
+    public Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement)
     {
         return ((List<?>) value).stream()
-                                .map(element -> 
type().convertForCqlWriter(element, version))
+                                .map(element -> 
type().convertForCqlWriter(element, version, true))
                                 .collect(Collectors.toList());
     }
 
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
index 39edf62..6d0e49d 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
@@ -107,12 +107,12 @@ public class CqlMap extends CqlCollection implements 
CqlField.CqlMap
 
     @Override
     @SuppressWarnings("unchecked")
-    public Object convertForCqlWriter(Object value, CassandraVersion version)
+    public Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement)
     {
         Map<Object, Object> map = (Map<Object, Object>) value;
         return map.entrySet().stream()
-                .collect(Collectors.toMap(element -> 
keyType().convertForCqlWriter(element.getKey(), version),
-                                          element -> 
valueType().convertForCqlWriter(element.getValue(), version)));
+                .collect(Collectors.toMap(element -> 
keyType().convertForCqlWriter(element.getKey(), version, true),
+                                          element -> 
valueType().convertForCqlWriter(element.getValue(), version, true)));
     }
 
     @Override
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
index 84f649d..c50b805 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
@@ -92,10 +92,10 @@ public class CqlSet extends CqlList implements 
CqlField.CqlSet
     }
 
     @Override
-    public Object convertForCqlWriter(Object value, CassandraVersion version)
+    public Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement)
     {
         return ((Set<?>) value).stream()
-                               .map(element -> 
type().convertForCqlWriter(element, version))
+                               .map(element -> 
type().convertForCqlWriter(element, version, true))
                                .collect(Collectors.toSet());
     }
 
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
index 32f3bab..28986a1 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
@@ -150,7 +150,7 @@ public class CqlTuple extends CqlCollection implements 
CqlField.CqlTuple
     }
 
     @Override
-    public Object convertForCqlWriter(Object value, CassandraVersion version)
+    public Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement)
     {
         return toTupleValue(version, this, value);
     }
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
index 8477c2b..31a8187 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
@@ -113,7 +113,7 @@ public class CqlUdt extends CqlType implements 
CqlField.CqlUdt
     }
 
     @Override
-    public Object convertForCqlWriter(@NotNull Object value, CassandraVersion 
version)
+    public Object convertForCqlWriter(@NotNull Object value, CassandraVersion 
version, boolean isCollectionElement)
     {
         if (value instanceof UDTValue)
         {
@@ -419,7 +419,7 @@ public class CqlUdt extends CqlType implements 
CqlField.CqlUdt
                                              int position,
                                              @Nullable Object value)
     {
-        type.setInnerValue(udtValue, position, value == null ? null : 
type.convertForCqlWriter(value, version));
+        type.setInnerValue(udtValue, position, value == null ? null : 
type.convertForCqlWriter(value, version, false));
     }
 
     public static UserType toUserType(CqlUdt udt)
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
index 0f0044e..e8d9978 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
@@ -62,7 +62,7 @@ public class Date extends NativeType
     }
 
     @Override
-    public Object convertForCqlWriter(Object value, CassandraVersion version)
+    public Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement)
     {
         // Cassandra 4.0 no longer allows writing date types as Integers in 
CqlWriter,
         // so we need to convert to LocalDate before writing in tests
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
index 1e8ca98..264f8fa 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
@@ -19,7 +19,22 @@
 
 package org.apache.cassandra.spark.data.types;
 
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.cassandra.cql3.functions.types.DataType;
+import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.DurationType;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.spark.data.NativeType;
+import org.apache.cassandra.spark.utils.RandomUtils;
+import org.jetbrains.annotations.NotNull;
 
 public class Duration extends NativeType
 {
@@ -32,8 +47,141 @@ public class Duration extends NativeType
     }
 
     @Override
-    public boolean isSupported()
+    public AbstractType<?> dataType()
+    {
+        return DurationType.instance;
+    }
+
+    public <T> TypeSerializer<T> serializer()
+    {
+        return (TypeSerializer<T>) AnalyticsDurationSerializer.INSTANCE;
+    }
+
+    @Override
+    public DataType driverDataType(boolean isFrozen)
+    {
+        return DataType.duration();
+    }
+
+    @Override
+    public boolean supportedAsPrimaryKeyColumn()
     {
         return false;
     }
+
+    @Override
+    public boolean supportedAsMapKey()
+    {
+        return false;
+    }
+
+    @Override
+    public boolean supportedAsSetElement()
+    {
+        return false;
+    }
+
+    @Override
+    public Object convertForCqlWriter(Object value, CassandraVersion version, 
boolean isCollectionElement)
+    {
+        InternalDuration duration = (InternalDuration) value;
+        return isCollectionElement
+               ? AnalyticsDurationSerializer.toCql3FunctionDuration(duration)
+               : AnalyticsDurationSerializer.toCql3Duration(duration);
+    }
+
+    @Override
+    protected void setInnerValueInternal(SettableByIndexData<?> udtValue, int 
position, @NotNull Object value)
+    {
+        org.apache.cassandra.cql3.functions.types.Duration duration = null;
+        if (value instanceof InternalDuration)
+        {
+            duration = 
AnalyticsDurationSerializer.toCql3FunctionDuration((InternalDuration) value);
+        }
+        else if (value instanceof org.apache.cassandra.cql3.Duration)
+        {
+            duration = 
AnalyticsDurationSerializer.toCql3FunctionDuration((org.apache.cassandra.cql3.Duration)
 value);
+        }
+        else
+        {
+            duration = (org.apache.cassandra.cql3.functions.types.Duration) 
value;
+        }
+        udtValue.set(position, duration, 
org.apache.cassandra.cql3.functions.types.Duration.class);
+    }
+
+    @Override
+    public Object randomValue(int minCollectionSize)
+    {
+        return new InternalDuration(RandomUtils.randomPositiveInt(100),
+                                    RandomUtils.randomPositiveInt(100),
+                                    
TimeUnit.MICROSECONDS.toNanos(RandomUtils.randomPositiveInt(1000000)));
+    }
+
+    /**
+     * Serializes Spark {@code CalendarInterval} type as CQL {@link 
org.apache.cassandra.cql3.Duration}.
+     * Implementation note: {@code TypeSerializer<Object>} is used to prevent 
class cast exception from
+     * {@code CalendarInterval}, which is not direct dependency of this module.
+     */
+    private static class AnalyticsDurationSerializer extends 
TypeSerializer<InternalDuration>
+    {
+        private static final AnalyticsDurationSerializer INSTANCE = new 
AnalyticsDurationSerializer();
+
+        @Override
+        public ByteBuffer serialize(InternalDuration value)
+        {
+            org.apache.cassandra.cql3.Duration cqlDuration = 
toCql3Duration(value);
+            return 
org.apache.cassandra.serializers.DurationSerializer.instance.serialize(cqlDuration);
+        }
+
+        public <V> InternalDuration deserialize(V v, ValueAccessor<V> 
valueAccessor)
+        {
+            org.apache.cassandra.cql3.Duration cqlDuration = 
org.apache.cassandra.serializers.DurationSerializer.instance.deserialize(v, 
valueAccessor);
+            return fromCql3Duration(cqlDuration);
+        }
+
+        public <V> void validate(V v, ValueAccessor<V> valueAccessor) throws 
MarshalException
+        {
+            
org.apache.cassandra.serializers.DurationSerializer.instance.validate(v, 
valueAccessor);
+        }
+
+        public String toString(InternalDuration duration)
+        {
+            return duration == null ? "" : duration.toString();
+        }
+
+        public Class<InternalDuration> getType()
+        {
+            return InternalDuration.class;
+        }
+
+        public static org.apache.cassandra.cql3.Duration 
toCql3Duration(InternalDuration duration)
+        {
+            return nullOrConvert(duration, d -> 
org.apache.cassandra.cql3.Duration.newInstance(d.months, d.days, 
d.nanoseconds));
+        }
+
+        public static org.apache.cassandra.cql3.functions.types.Duration 
toCql3FunctionDuration(InternalDuration duration)
+        {
+            return nullOrConvert(duration, d -> 
org.apache.cassandra.cql3.functions.types.Duration.newInstance(d.months, 
d.days, d.nanoseconds));
+        }
+
+        public static org.apache.cassandra.cql3.functions.types.Duration 
toCql3FunctionDuration(org.apache.cassandra.cql3.Duration duration)
+        {
+            return nullOrConvert(duration, d -> 
org.apache.cassandra.cql3.functions.types.Duration.newInstance(d.getMonths(), 
d.getDays(), d.getNanoseconds()));
+        }
+
+        public static InternalDuration 
fromCql3Duration(org.apache.cassandra.cql3.Duration duration)
+        {
+            return nullOrConvert(duration, d -> new 
InternalDuration(d.getMonths(), d.getDays(), d.getNanoseconds()));
+        }
+
+        private static <I, O> O nullOrConvert(I input, Function<I, O> 
converter)
+        {
+            if (input == null)
+            {
+                return null;
+            }
+
+            return converter.apply(input);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to