This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5dcb065 CASSANALYTICS-18: Support CQL duration type (#102)
5dcb065 is described below
commit 5dcb065dc8de0582f6e72b137798f2b883540f73
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Tue Mar 18 18:07:35 2025 +0100
CASSANALYTICS-18: Support CQL duration type (#102)
Patch by Lukasz Antoniak; Reviewed by Francisco Guerrero, James Berragan,
Yifan Cai for CASSANALYTICS-18
---
CHANGES.txt | 1 +
.../java/org/apache/cassandra/cdc/CdcTests.java | 7 +
.../cassandra/cdc/CollectionDeletionTests.java | 1 +
.../cassandra/cdc/PartitionDeletionTests.java | 1 +
.../apache/cassandra/cdc/RangeDeletionTests.java | 1 +
.../cassandra/bridge/type/InternalDuration.java | 78 +++++++++++
.../org/apache/cassandra/spark/data/CqlField.java | 26 +++-
.../bridge/type/InternalDurationTest.java | 24 ++--
.../cassandra/spark/bulkwriter/RecordWriter.java | 2 +-
.../spark/bulkwriter/SqlToCqlTypeConverter.java | 35 ++++-
.../org/apache/cassandra/spark/EndToEndTests.java | 17 +++
.../bulkwriter/SqlToCqlTypeConverterTest.java | 2 +
.../spark/reader/DataTypeSerializationTests.java | 85 +++++++-----
.../spark/sparksql/SparkRowIteratorTests.java | 6 +
.../analytics/BulkWriteDataTypesTest.java | 18 +++
.../spark/data/converter/types/SparkDuration.java | 51 ++++++-
.../cassandra/spark/utils/SparkTypeUtils.java | 55 ++++++++
.../cassandra/spark/utils/SparkTypeUtils.java | 55 ++++++++
.../cassandra/spark/utils/SparkTypeUtils.java | 50 +++++++
.../cassandra/spark/utils/SparkTypeUtils.java | 50 +++++++
.../cassandra/spark/utils/test/TestSchema.java | 6 +-
.../spark/data/converter/types/DateTypeTests.java | 2 +-
.../cassandra/spark/reader/PartitionKeyTests.java | 79 ++++++-----
.../apache/cassandra/spark/data/NativeType.java | 2 +-
.../cassandra/spark/data/complex/CqlFrozen.java | 4 +-
.../cassandra/spark/data/complex/CqlList.java | 4 +-
.../cassandra/spark/data/complex/CqlMap.java | 6 +-
.../cassandra/spark/data/complex/CqlSet.java | 4 +-
.../cassandra/spark/data/complex/CqlTuple.java | 2 +-
.../cassandra/spark/data/complex/CqlUdt.java | 4 +-
.../apache/cassandra/spark/data/types/Date.java | 2 +-
.../cassandra/spark/data/types/Duration.java | 150 ++++++++++++++++++++-
32 files changed, 725 insertions(+), 105 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 086a5e4..3e7e9b7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Support CQL duration type (CASSANALYTICS-18)
* Add Schema Store interfaces for CDC (CASSANALYTICS-8)
* Add CassandraBridge helper APIs that can be used by external tooling
(CASSANALYTICS-4)
* Refactor to decouple RowIterator and CellIterator from Spark so bulk reads
can be performed outside of Spark (CASSANDRA-20259)
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java
index 38c59db..28c29c7 100644
---
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CdcTests.java
@@ -342,6 +342,7 @@ public class CdcTests
public void testClusteringKey()
{
qt().forAll(cql3Type(BRIDGE))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(type ->
testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
.withPartitionKey("pk", BRIDGE.uuid())
@@ -371,6 +372,9 @@ public class CdcTests
public void testMultipleClusteringKeys()
{
qt().withExamples(50).forAll(cql3Type(BRIDGE), cql3Type(BRIDGE),
cql3Type(BRIDGE))
+ .assuming((t1, t2, t3) -> t1.supportedAsPrimaryKeyColumn()
+ && t2.supportedAsPrimaryKeyColumn()
+ && t3.supportedAsPrimaryKeyColumn())
.checkAssert(
(t1, t2, t3) ->
testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
@@ -407,6 +411,7 @@ public class CdcTests
public void testSet()
{
qt().forAll(cql3Type(BRIDGE))
+ .assuming(CqlField.CqlType::supportedAsSetElement)
.checkAssert(
t -> testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
.withPartitionKey("pk",
BRIDGE.uuid())
@@ -487,6 +492,7 @@ public class CdcTests
public void testMap()
{
qt().withExamples(50).forAll(cql3Type(BRIDGE), cql3Type(BRIDGE))
+ .assuming((t1, t2) -> t1.supportedAsMapKey() &&
t2.supportedAsMapKey())
.checkAssert(
(t1, t2) -> testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
.withPartitionKey("pk", BRIDGE.uuid())
@@ -766,6 +772,7 @@ public class CdcTests
public void testCompositePartitionKey()
{
qt().forAll(cql3Type(BRIDGE))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(
type ->
testWith(BRIDGE, directory, TestSchema.builder(BRIDGE)
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java
index b736bb1..9f3decb 100644
---
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/CollectionDeletionTests.java
@@ -98,6 +98,7 @@ public class CollectionDeletionTests
final long minTimestamp = System.currentTimeMillis();
final int numRows = 1000;
qt().forAll(cql3Type(BRIDGE))
+ .assuming(CqlField.CqlType::supportedAsMapKey)
.checkAssert(
type -> testWith(BRIDGE, directory, schemaBuilder.apply(type))
.withAddLastModificationTime(true)
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/PartitionDeletionTests.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/PartitionDeletionTests.java
index 52496cc..9ae344d 100644
---
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/PartitionDeletionTests.java
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/PartitionDeletionTests.java
@@ -105,6 +105,7 @@ public class PartitionDeletionTests
final long minTimestamp = System.currentTimeMillis();
final int numRows = 1000;
qt().forAll(cql3Type(BRIDGE))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(type -> {
testWith(BRIDGE, directory, schemaBuilder.apply(type))
.withAddLastModificationTime(true)
diff --git
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/RangeDeletionTests.java
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/RangeDeletionTests.java
index e83a88b..0cd9937 100644
---
a/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/RangeDeletionTests.java
+++
b/cassandra-analytics-cdc/src/test/java/org/apache/cassandra/cdc/RangeDeletionTests.java
@@ -110,6 +110,7 @@ public class RangeDeletionTests
long minTimestamp = System.currentTimeMillis();
int numRows = 1000;
qt().forAll(cql3Type(BRIDGE))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(
type ->
testWith(BRIDGE, directory, schemaBuilder.apply(type))
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/type/InternalDuration.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/type/InternalDuration.java
new file mode 100644
index 0000000..b2ac781
--- /dev/null
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/bridge/type/InternalDuration.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.bridge.type;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Analytics-internal Duration type to bridge CQL Duration type.
+ */
+public class InternalDuration implements Serializable
+{
+ private static final long serialVersionUID = -6867844786318937949L;
+ public final int months;
+ public final int days;
+ public final long nanoseconds;
+
+ public InternalDuration(int months, int days, long nanoseconds)
+ {
+ this.months = months;
+ this.days = days;
+ this.nanoseconds = nanoseconds;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+ InternalDuration that = (InternalDuration) o;
+ return months == that.months &&
+ days == that.days &&
+ nanoseconds == that.nanoseconds;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(months, days, nanoseconds);
+ }
+
+ @Override
+ public String toString()
+ {
+ StringBuilder builder = new StringBuilder();
+ if (this.months < 0 || this.days < 0 || this.nanoseconds < 0L)
+ {
+ builder.append('-');
+ }
+ builder.append("mo").append(Math.abs(this.months));
+ builder.append("d").append(Math.abs(this.days));
+ builder.append("ns").append(Math.abs(this.nanoseconds));
+ return builder.toString();
+ }
+}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java
index 05523ce..1c15fad 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java
@@ -109,6 +109,30 @@ public class CqlField implements Serializable,
Comparable<CqlField>
return false;
}
+ /**
+ * @return true if type can be part of primary key
+ */
+ default boolean supportedAsPrimaryKeyColumn()
+ {
+ return true;
+ }
+
+ /**
+ * @return true if type can be used as map key
+ */
+ default boolean supportedAsMapKey()
+ {
+ return true;
+ }
+
+ /**
+ * @return true if type can be used as set element
+ */
+ default boolean supportedAsSetElement()
+ {
+ return true;
+ }
+
default Object deserializeToType(TypeConverter converter, ByteBuffer
buffer)
{
return deserializeToType(converter, buffer, isFrozen());
@@ -153,7 +177,7 @@ public class CqlField implements Serializable,
Comparable<CqlField>
Object randomValue(int minCollectionSize);
@VisibleForTesting
- Object convertForCqlWriter(Object value, CassandraVersion version);
+ Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement);
// Kryo Serialization
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/type/InternalDurationTest.java
similarity index 63%
copy from
cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
copy to
cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/type/InternalDurationTest.java
index 1e8ca98..d1c1c19 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
+++
b/cassandra-analytics-common/src/test/java/org/apache/cassandra/bridge/type/InternalDurationTest.java
@@ -17,23 +17,21 @@
* under the License.
*/
-package org.apache.cassandra.spark.data.types;
+package org.apache.cassandra.bridge.type;
-import org.apache.cassandra.spark.data.NativeType;
+import org.junit.jupiter.api.Test;
-public class Duration extends NativeType
-{
- public static final Duration INSTANCE = new Duration();
+import static org.junit.jupiter.api.Assertions.assertEquals;
- @Override
- public String name()
+public class InternalDurationTest
+{
+ @Test
+ public void testDurationToString()
{
- return "duration";
- }
+ InternalDuration duration = new InternalDuration(1, 2, 123456789123L);
+ assertEquals("mo1d2ns123456789123", duration.toString());
- @Override
- public boolean isSupported()
- {
- return false;
+ duration = new InternalDuration(0, -3, 987L);
+ assertEquals("-mo0d3ns987", duration.toString());
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 83832fd..633e7ef 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -400,7 +400,7 @@ public class RecordWriter
udtValue.udtMap.put(entry.getKey(),
maybeConvertUdt(entry.getValue()));
}
}
- return getUdt(udtValue.name).convertForCqlWriter(udtValue.udtMap,
writerContext.bridge().getVersion());
+ return getUdt(udtValue.name).convertForCqlWriter(udtValue.udtMap,
writerContext.bridge().getVersion(), false);
}
return value;
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
index d148de7..b7b8392 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverter.java
@@ -41,10 +41,13 @@ import com.google.common.net.InetAddresses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.bridge.type.InternalDuration;
import org.apache.cassandra.spark.data.BridgeUdtValue;
import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.utils.SparkTypeUtils;
import org.apache.cassandra.spark.utils.UUIDs;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
+import org.apache.spark.unsafe.types.CalendarInterval;
import scala.Tuple2;
import static
org.apache.cassandra.spark.utils.ScalaConversionUtils.asJavaIterable;
@@ -61,6 +64,7 @@ public final class SqlToCqlTypeConverter implements
Serializable
public static final String DATE = "date";
public static final String DECIMAL = "decimal";
public static final String DOUBLE = "double";
+ public static final String DURATION = "duration";
public static final String FLOAT = "float";
public static final String FROZEN = "frozen";
public static final String INET = "inet";
@@ -94,6 +98,7 @@ public final class SqlToCqlTypeConverter implements
Serializable
private static final TimeUUIDConverter TIME_UUID_CONVERTER = new
TimeUUIDConverter();
private static final InetAddressConverter INET_ADDRESS_CONVERTER = new
InetAddressConverter();
public static final DateConverter DATE_CONVERTER = new DateConverter();
+ public static final DurationConverter DURATION_CONVERTER = new
DurationConverter();
private SqlToCqlTypeConverter()
{
@@ -127,6 +132,8 @@ public final class SqlToCqlTypeConverter implements
Serializable
return BIG_DECIMAL_CONVERTER;
case DOUBLE:
return NO_OP_CONVERTER;
+ case DURATION:
+ return DURATION_CONVERTER;
case FLOAT:
return NO_OP_CONVERTER;
case FROZEN:
@@ -213,7 +220,7 @@ public final class SqlToCqlTypeConverter implements
Serializable
}
}
- abstract static class Converter<T> implements Serializable
+ public abstract static class Converter<T> implements Serializable
{
public T convert(Object object)
{
@@ -547,6 +554,32 @@ public final class SqlToCqlTypeConverter implements
Serializable
}
}
+ public static class DurationConverter extends
NullableConverter<InternalDuration>
+ {
+ @Override
+ public InternalDuration convertInternal(Object object)
+ {
+ // TODO: Support conversion from ISO8601 duration representation
and standard form (check Java driver).
+ // Currently Spark does not support persisting CalendarInterval,
so this is optional.
+ // Test simpleDurationSchemaSetup() shall fail when Sparks
supports it.
+ if (object instanceof CalendarInterval)
+ {
+ CalendarInterval cl = (CalendarInterval) object;
+ return SparkTypeUtils.convertDuration(cl);
+ }
+ else
+ {
+ throw new RuntimeException("Unsupported conversion for
DURATION from " + object.getClass().getTypeName());
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Duration";
+ }
+ }
+
public static class TimeConverter extends NullableConverter<Long>
{
@Override
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
index 232ecb7..8195dad 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
@@ -155,6 +155,7 @@ public class EndToEndTests
public void testSingleClusteringKeyOrderBy(CassandraBridge bridge)
{
qt().forAll(TestUtils.cql3Type(bridge), TestUtils.sortOrder())
+ .assuming((clusteringKeyType, sortOrder) ->
clusteringKeyType.supportedAsPrimaryKeyColumn())
.checkAssert((clusteringKeyType, sortOrder) ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("a",
bridge.bigint())
@@ -206,6 +207,7 @@ public class EndToEndTests
{
// Test partition key can be read for all data types
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(partitionKeyType -> {
// Boolean or empty types have limited cardinality
int numRows = partitionKeyType.cardinality(10);
@@ -885,6 +887,7 @@ public class EndToEndTests
public void testSet(CassandraBridge bridge)
{
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsSetElement)
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -914,6 +917,7 @@ public class EndToEndTests
{
qt().withExamples(50) // Limit number of tests otherwise n x n tests
takes too long
.forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+ .assuming((keyType, valueType) -> keyType.supportedAsMapKey())
.checkAssert((keyType, valueType) ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -943,6 +947,7 @@ public class EndToEndTests
{
// pk -> a frozen<set<?>>
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsSetElement)
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -974,6 +979,7 @@ public class EndToEndTests
// pk -> a frozen<map<?, ?>>
qt().withExamples(50) // Limit number of tests otherwise n x n tests
takes too long
.forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+ .assuming((keyType, valueType) -> keyType.supportedAsMapKey())
.checkAssert((keyType, valueType) ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1178,6 +1184,7 @@ public class EndToEndTests
{
// pk -> a testudt<b text, c frozen<type>, d int>
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsSetElement)
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1215,6 +1222,7 @@ public class EndToEndTests
// pk -> a testudt<b float, c frozen<set<uuid>>, d frozen<map<type1,
type2>>, e boolean>
qt().withExamples(50)
.forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+ .assuming((type1, type2) -> type1.supportedAsMapKey())
.checkAssert((type1, type2) ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1235,6 +1243,7 @@ public class EndToEndTests
// pk -> col1 udt1<a float, b frozen<set<uuid>>, c frozen<set<type>>,
d boolean>,
// col2 udt2<a text, b bigint, g varchar>, col3 udt3<int, type,
ascii>
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsSetElement)
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1305,6 +1314,7 @@ public class EndToEndTests
// pk -> col1 type1 -> a tuple<int, type2, bigint>
qt().withExamples(10)
.forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+ .assuming((type1, type2) -> type1.supportedAsPrimaryKeyColumn())
.checkAssert((type1, type2) ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1346,6 +1356,7 @@ public class EndToEndTests
// Test set nested within tuple
qt().withExamples(10)
.forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsSetElement)
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1390,6 +1401,7 @@ public class EndToEndTests
// Test map nested within tuple
qt().withExamples(10)
.forAll(TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+ .assuming((type1, type2) -> type1.supportedAsMapKey())
.checkAssert((type1, type2) ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1431,6 +1443,7 @@ public class EndToEndTests
// Test tuple nested within set
qt().withExamples(10)
.forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsSetElement)
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1510,6 +1523,7 @@ public class EndToEndTests
public void testTupleClusteringKey(CassandraBridge bridge)
{
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -1529,6 +1543,7 @@ public class EndToEndTests
public void testUdtClusteringKey(CassandraBridge bridge)
{
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(type ->
Tester.builder(TestSchema.builder(bridge)
.withPartitionKey("pk", bridge.uuid())
@@ -2564,6 +2579,7 @@ public class EndToEndTests
public void
testSinglePartitionAndClusteringKeyWithNullValueColumn(CassandraBridge bridge)
{
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(clusteringKeyType ->
Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", bridge.bigint())
.withClusteringKey("b",
clusteringKeyType)
@@ -2578,6 +2594,7 @@ public class EndToEndTests
public void testMultipleValueColumnsWithNullValueColumn(CassandraBridge
bridge)
{
qt().forAll(TestUtils.cql3Type(bridge))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
.checkAssert(clusteringKeyType ->
Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", bridge.bigint())
.withClusteringKey("b",
clusteringKeyType)
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java
index c72bddd..dab3f79 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SqlToCqlTypeConverterTest.java
@@ -35,6 +35,7 @@ import static
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.COUNTE
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
import static
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DECIMAL;
import static
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DOUBLE;
+import static
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DURATION;
import static
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.FLOAT;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INET;
import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
@@ -72,6 +73,7 @@ public final class SqlToCqlTypeConverterTest
na(mockCqlType(TIMEUUID),
SqlToCqlTypeConverter.TimeUUIDConverter.class),
na(mockCqlType(INET),
SqlToCqlTypeConverter.InetAddressConverter.class),
na(mockCqlType(DATE),
SqlToCqlTypeConverter.DateConverter.class),
+ na(mockCqlType(DURATION),
SqlToCqlTypeConverter.DurationConverter.class),
na(mockMapCqlType(INT, INT),
SqlToCqlTypeConverter.MapConverter.class),
na(mockSetCqlType(INET),
SqlToCqlTypeConverter.SetConverter.class),
na(mockUdtCqlType("udtType", "f1", TEXT, "f2",
INT, "f3", TIMEUUID), SqlToCqlTypeConverter.UdtConverter.class),
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java
index cb81db3..6a91bf2 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/reader/DataTypeSerializationTests.java
@@ -39,14 +39,18 @@ import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.type.InternalDuration;
import org.apache.cassandra.spark.TestUtils;
+import org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.converter.types.SparkType;
import org.apache.cassandra.spark.utils.RandomUtils;
+import org.apache.cassandra.spark.utils.SparkTypeUtils;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import static org.apache.cassandra.bridge.CassandraBridgeFactory.getSparkSql;
@@ -294,6 +298,18 @@ public class DataTypeSerializationTests
});
}
+ @Test
+ public void testDuration()
+ {
+ qt().forAll(TestUtils.bridges()).checkAssert(bridge -> {
+ CalendarInterval value = SparkTypeUtils.convertDuration(new
InternalDuration(1, 2, 7000000000L));
+ Object converted =
SqlToCqlTypeConverter.getConverter(bridge.duration()).convert(value);
+ Object deserialized = toDuration(bridge, converted);
+ assertInstanceOf(CalendarInterval.class, deserialized);
+ assertEquals(value, deserialized);
+ });
+ }
+
@Test
public void testTimestamp()
{
@@ -425,7 +441,7 @@ public class DataTypeSerializationTests
public void testSet()
{
runTest((partitioner, directory, bridge) ->
- qt().forAll(TestUtils.cql3Type(bridge)).checkAssert(type -> {
+
qt().forAll(TestUtils.cql3Type(bridge)).assuming(CqlField.CqlType::supportedAsSetElement).checkAssert(type
-> {
CqlField.CqlSet set = bridge.set(type);
SparkType sparkType =
getSparkSql(bridge).toSparkType(type);
Set<Object> expected = IntStream.range(0, 128)
@@ -445,38 +461,40 @@ public class DataTypeSerializationTests
public void testMap()
{
runTest((partitioner, directory, bridge) ->
- qt().forAll(TestUtils.cql3Type(bridge),
TestUtils.cql3Type(bridge)).checkAssert((keyType, valueType) -> {
- CqlField.CqlMap map = bridge.map(keyType, valueType);
- SparkType keySparkType =
getSparkSql(bridge).toSparkType(keyType);
- SparkType valueSparkType =
getSparkSql(bridge).toSparkType(valueType);
-
- int count = keyType.cardinality(128);
- Map<Object, Object> expected = new HashMap<>(count);
- for (int entry = 0; entry < count; entry++)
- {
- Object key = null;
- while (key == null || expected.containsKey(key))
+ qt().forAll(TestUtils.cql3Type(bridge),
TestUtils.cql3Type(bridge))
+ .assuming((keyType, valueType) ->
keyType.supportedAsMapKey())
+ .checkAssert((keyType, valueType) -> {
+ CqlField.CqlMap map = bridge.map(keyType, valueType);
+ SparkType keySparkType =
getSparkSql(bridge).toSparkType(keyType);
+ SparkType valueSparkType =
getSparkSql(bridge).toSparkType(valueType);
+
+ int count = keyType.cardinality(128);
+ Map<Object, Object> expected = new HashMap<>(count);
+ for (int entry = 0; entry < count; entry++)
{
- key = keyType.randomValue();
+ Object key = null;
+ while (key == null || expected.containsKey(key))
+ {
+ key = keyType.randomValue();
+ }
+ expected.put(key, valueType.randomValue());
+ }
+ ByteBuffer buffer = map.serialize(expected);
+ ArrayBasedMapData mapData = ((ArrayBasedMapData)
map.deserializeToType(getSparkSql(bridge), buffer));
+ ArrayData keys = mapData.keyArray();
+ ArrayData values = mapData.valueArray();
+ Map<Object, Object> actual = new
HashMap<>(keys.numElements());
+ for (int index = 0; index < keys.numElements();
index++)
+ {
+ Object key =
keySparkType.toTestRowType(keys.get(index,
getSparkSql(bridge).sparkSqlType(keyType)));
+ Object value =
valueSparkType.toTestRowType(values.get(index,
getSparkSql(bridge).sparkSqlType(valueType)));
+ actual.put(key, value);
+ }
+ assertEquals(expected.size(), actual.size());
+ for (Map.Entry<Object, Object> entry :
expected.entrySet())
+ {
+ assertEquals(entry.getValue(),
actual.get(entry.getKey()));
}
- expected.put(key, valueType.randomValue());
- }
- ByteBuffer buffer = map.serialize(expected);
- ArrayBasedMapData mapData = ((ArrayBasedMapData)
map.deserializeToType(getSparkSql(bridge), buffer));
- ArrayData keys = mapData.keyArray();
- ArrayData values = mapData.valueArray();
- Map<Object, Object> actual = new
HashMap<>(keys.numElements());
- for (int index = 0; index < keys.numElements(); index++)
- {
- Object key =
keySparkType.toTestRowType(keys.get(index,
getSparkSql(bridge).sparkSqlType(keyType)));
- Object value =
valueSparkType.toTestRowType(values.get(index,
getSparkSql(bridge).sparkSqlType(valueType)));
- actual.put(key, value);
- }
- assertEquals(expected.size(), actual.size());
- for (Map.Entry<Object, Object> entry : expected.entrySet())
- {
- assertEquals(entry.getValue(),
actual.get(entry.getKey()));
- }
}));
}
@@ -617,6 +635,11 @@ public class DataTypeSerializationTests
return toNative(bridge, CassandraBridge::time, value);
}
+ static Object toDuration(CassandraBridge bridge, Object value)
+ {
+ return toNative(bridge, CassandraBridge::duration, value);
+ }
+
static Object toTimestamp(CassandraBridge bridge, Object value)
{
return toNative(bridge, CassandraBridge::timestamp, value);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
index 22101e0..7df0cb8 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/sparksql/SparkRowIteratorTests.java
@@ -63,6 +63,7 @@ public class SparkRowIteratorTests
{
// I.e. "create table keyspace.table (a %s, b %s, primary key(a));"
qt().forAll(TestUtils.versions(), TestUtils.cql3Type(bridge),
TestUtils.cql3Type(bridge))
+ .assuming((version, type1, type2) ->
type1.supportedAsPrimaryKeyColumn())
.checkAssert((version, type1, type2) -> runTest(version,
TestSchema.builder(bridge)
.withPartitionKey("a", type1)
.withColumn("b", type2)
@@ -74,6 +75,9 @@ public class SparkRowIteratorTests
public void testMultiPartitionKeys(CassandraBridge bridge)
{
qt().forAll(TestUtils.versions(), TestUtils.cql3Type(bridge),
TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge))
+ .assuming((version, type1, type2, type3) ->
type1.supportedAsPrimaryKeyColumn()
+ &&
type2.supportedAsPrimaryKeyColumn()
+ &&
type3.supportedAsPrimaryKeyColumn())
.checkAssert((version, type1, type2, type3) -> runTest(version,
TestSchema.builder(bridge)
.withPartitionKey("a", type1)
.withPartitionKey("b", type2)
@@ -89,6 +93,7 @@ public class SparkRowIteratorTests
for (CassandraVersion version : TestUtils.testableVersions())
{
qt().forAll(TestUtils.cql3Type(bridge),
TestUtils.cql3Type(bridge), TestUtils.cql3Type(bridge), TestUtils.sortOrder())
+ .assuming((type1, type2, type3, order) ->
type1.supportedAsPrimaryKeyColumn() && type2.supportedAsPrimaryKeyColumn())
.checkAssert((type1, type2, type3, order) -> runTest(version,
TestSchema.builder(bridge)
.withPartitionKey("a", type1)
.withClusteringKey("b", type2)
@@ -105,6 +110,7 @@ public class SparkRowIteratorTests
for (CassandraVersion version : TestUtils.testableVersions())
{
qt().forAll(TestUtils.cql3Type(bridge),
TestUtils.cql3Type(bridge), TestUtils.sortOrder(), TestUtils.sortOrder())
+ .assuming((type1, type2, order1, order2) ->
type1.supportedAsPrimaryKeyColumn() && type2.supportedAsPrimaryKeyColumn())
.checkAssert((type1, type2, order1, order2) ->
runTest(version, TestSchema.builder(bridge)
.withPartitionKey("a", bridge.bigint())
.withClusteringKey("b", type1)
diff --git
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
index c63accd..3492879 100644
---
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
+++
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BulkWriteDataTypesTest.java
@@ -43,10 +43,12 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.bridge.type.InternalDuration;
import org.apache.cassandra.sidecar.testing.QualifiedName;
import org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.ScalaConversionUtils;
+import org.apache.cassandra.spark.utils.SparkTypeUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -58,6 +60,7 @@ import scala.collection.mutable.Seq;
import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
import static org.apache.spark.sql.types.DataTypes.BinaryType;
+import static org.apache.spark.sql.types.DataTypes.CalendarIntervalType;
import static org.apache.spark.sql.types.DataTypes.DateType;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.LongType;
@@ -147,6 +150,9 @@ class BulkWriteDataTypesTest extends
SharedClusterSparkIntegrationTestBase
// Simple schema with Date as column.
types.add(simpleDateSchemaSetup());
+ // Simple schema with Duration as column.
+ types.add(simpleDurationSchemaSetup());
+
// Simple schema with integers and strings as columns.
types.add(integersAndStringsSchemaSetup());
@@ -214,6 +220,16 @@ class BulkWriteDataTypesTest extends
SharedClusterSparkIntegrationTestBase
return setup;
}
+ static TypeTestSetup simpleDurationSchemaSetup()
+ {
+ return new TypeTestSetup("duration_schema",
+ Arrays.asList("id", "took"),
+ Arrays.asList(IntegerType,
CalendarIntervalType),
+ Arrays.asList(INTEGER_MAPPER,
DURATION_MAPPER),
+ "CREATE TABLE %s (id int, took duration,
PRIMARY KEY (id))",
+ "Cannot save interval data type into external
storage.");
+ }
+
static TypeTestSetup integersAndStringsSchemaSetup()
{
return new TypeTestSetup("simple_schema",
@@ -441,6 +457,8 @@ class BulkWriteDataTypesTest extends
SharedClusterSparkIntegrationTestBase
= recordNumber -> Timestamp.from(new
Date(1731457509115L).toInstant().plus(recordNumber, ChronoUnit.SECONDS));
static final Function<Integer, Object> DATE_MAPPER
= recordNumber -> java.sql.Date.valueOf(((Timestamp)
TIMESTAMP_MAPPER.apply(recordNumber)).toLocalDateTime().toLocalDate());
+ static final Function<Integer, Object> DURATION_MAPPER
+ = recordNumber -> SparkTypeUtils.convertDuration(new InternalDuration(1,
recordNumber, recordNumber * 1000000000));
static class TypeTestSetup
{
diff --git
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/SparkDuration.java
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/SparkDuration.java
index 6cba57c..e976e5f 100644
---
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/SparkDuration.java
+++
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/data/converter/types/SparkDuration.java
@@ -19,12 +19,59 @@
package org.apache.cassandra.spark.data.converter.types;
-public class SparkDuration implements NotImplementedFeatures
+import java.util.Comparator;
+
+import org.apache.cassandra.bridge.BigNumberConfig;
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.cassandra.spark.utils.SparkTypeUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.jetbrains.annotations.NotNull;
+
+public class SparkDuration implements SparkType
{
public static final SparkDuration INSTANCE = new SparkDuration();
+ private static final Comparator<CalendarInterval>
CALENDAR_INTERVAL_COMPARATOR =
+ SparkTypeUtils.CALENDAR_INTERVAL_COMPARATOR;
+
+ @Override
+ public DataType dataType(BigNumberConfig bigNumberConfig)
+ {
+ return DataTypes.CalendarIntervalType;
+ }
+
+ @Override
+ public Object nativeSparkSqlRowValue(Row row, int position)
+ {
+ return row.isNullAt(position) ? null : row.get(position); // should
return CalendarInterval type
+ }
- private SparkDuration()
+ @Override
+ public Object nativeSparkSqlRowValue(final GenericInternalRow row, final
int position)
{
+ return row.isNullAt(position) ? null : row.getInterval(position);
+ }
+
+ @Override
+ public Object toSparkSqlType(@NotNull Object value, boolean isFrozen)
+ {
+ InternalDuration duration = (InternalDuration) value;
+ return SparkTypeUtils.convertDuration(duration);
+ }
+ @Override
+ public Object toTestRowType(Object value)
+ {
+ CalendarInterval ci = (CalendarInterval) value;
+ return SparkTypeUtils.convertDuration(ci);
+ }
+
+ @Override
+ public int compareTo(Object first, Object second)
+ {
+ return CALENDAR_INTERVAL_COMPARATOR.compare((CalendarInterval) first,
(CalendarInterval) second);
}
}
diff --git
a/cassandra-analytics-spark-converter/src/main/scala-2.11-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
b/cassandra-analytics-spark-converter/src/main/scala-2.11-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
new file mode 100644
index 0000000..4df97e3
--- /dev/null
+++
b/cassandra-analytics-spark-converter/src/main/scala-2.11-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.spark.unsafe.types.CalendarInterval;
+
+public final class SparkTypeUtils
+{
+ private SparkTypeUtils()
+ {
+ throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
+ }
+
+ public static final Comparator<CalendarInterval>
CALENDAR_INTERVAL_COMPARATOR =
+ Comparator.<CalendarInterval>comparingInt(interval -> interval.months)
+ .thenComparingLong(interval -> interval.microseconds);
+
+ public static CalendarInterval convertDuration(InternalDuration duration)
+ {
+ // Unfortunately, it loses precision when converting to the spark data
type.
+ long micros = TimeUnit.NANOSECONDS.toMicros(duration.nanoseconds);
+ micros += duration.days * CalendarInterval.MICROS_PER_DAY;
+ return new CalendarInterval(duration.months, micros);
+ }
+
+ public static InternalDuration convertDuration(CalendarInterval interval)
+ {
+ int days = Ints.checkedCast(interval.microseconds /
CalendarInterval.MICROS_PER_DAY);
+ long microsRemain = interval.microseconds %
CalendarInterval.MICROS_PER_DAY;
+ return new InternalDuration(interval.months, days,
TimeUnit.MICROSECONDS.toNanos(microsRemain));
+ }
+}
diff --git
a/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
b/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
new file mode 100644
index 0000000..4df97e3
--- /dev/null
+++
b/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-2/org/apache/cassandra/spark/utils/SparkTypeUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.spark.unsafe.types.CalendarInterval;
+
+public final class SparkTypeUtils
+{
+ private SparkTypeUtils()
+ {
+ throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
+ }
+
+ public static final Comparator<CalendarInterval>
CALENDAR_INTERVAL_COMPARATOR =
+ Comparator.<CalendarInterval>comparingInt(interval -> interval.months)
+ .thenComparingLong(interval -> interval.microseconds);
+
+ public static CalendarInterval convertDuration(InternalDuration duration)
+ {
+ // Unfortunately, it loses precision when converting to the spark data
type.
+ long micros = TimeUnit.NANOSECONDS.toMicros(duration.nanoseconds);
+ micros += duration.days * CalendarInterval.MICROS_PER_DAY;
+ return new CalendarInterval(duration.months, micros);
+ }
+
+ public static InternalDuration convertDuration(CalendarInterval interval)
+ {
+ int days = Ints.checkedCast(interval.microseconds /
CalendarInterval.MICROS_PER_DAY);
+ long microsRemain = interval.microseconds %
CalendarInterval.MICROS_PER_DAY;
+ return new InternalDuration(interval.months, days,
TimeUnit.MICROSECONDS.toNanos(microsRemain));
+ }
+}
diff --git
a/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
b/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
new file mode 100644
index 0000000..4d200b3
--- /dev/null
+++
b/cassandra-analytics-spark-converter/src/main/scala-2.12-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.spark.unsafe.types.CalendarInterval;
+
+public final class SparkTypeUtils
+{
+ private SparkTypeUtils()
+ {
+ throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
+ }
+
+ public static final Comparator<CalendarInterval>
CALENDAR_INTERVAL_COMPARATOR =
+ Comparator.<CalendarInterval>comparingInt(interval -> interval.months)
+ .thenComparingInt(interval -> interval.days)
+ .thenComparingLong(interval -> interval.microseconds);
+
+ public static CalendarInterval convertDuration(InternalDuration duration)
+ {
+ // Unfortunately, it loses precision when converting to the spark data
type.
+ return new CalendarInterval(duration.months, duration.days,
TimeUnit.NANOSECONDS.toMicros(duration.nanoseconds));
+ }
+
+ public static InternalDuration convertDuration(CalendarInterval interval)
+ {
+ return new InternalDuration(interval.months, interval.days,
TimeUnit.MICROSECONDS.toNanos(interval.microseconds));
+ }
+}
diff --git
a/cassandra-analytics-spark-converter/src/main/scala-2.13-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
b/cassandra-analytics-spark-converter/src/main/scala-2.13-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
new file mode 100644
index 0000000..4d200b3
--- /dev/null
+++
b/cassandra-analytics-spark-converter/src/main/scala-2.13-spark-3/org/apache/cassandra/spark/utils/SparkTypeUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.utils;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.spark.unsafe.types.CalendarInterval;
+
+public final class SparkTypeUtils
+{
+ private SparkTypeUtils()
+ {
+ throw new IllegalStateException(getClass() + " is static utility class
and shall not be instantiated");
+ }
+
+ public static final Comparator<CalendarInterval>
CALENDAR_INTERVAL_COMPARATOR =
+ Comparator.<CalendarInterval>comparingInt(interval -> interval.months)
+ .thenComparingInt(interval -> interval.days)
+ .thenComparingLong(interval -> interval.microseconds);
+
+ public static CalendarInterval convertDuration(InternalDuration duration)
+ {
+ // Unfortunately, it loses precision when converting to the spark data
type.
+ return new CalendarInterval(duration.months, duration.days,
TimeUnit.NANOSECONDS.toMicros(duration.nanoseconds));
+ }
+
+ public static InternalDuration convertDuration(CalendarInterval interval)
+ {
+ return new InternalDuration(interval.months, interval.days,
TimeUnit.MICROSECONDS.toNanos(interval.microseconds));
+ }
+}
diff --git
a/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java
b/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java
index 46e7a7d..b667185 100644
---
a/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java
+++
b/cassandra-bridge/src/testFixtures/java/org/apache/cassandra/spark/utils/test/TestSchema.java
@@ -754,14 +754,14 @@ public final class TestSchema
Object[] result = new Object[end - start];
for (int sourceIndex = start, destinationIndex = 0; sourceIndex <
end; sourceIndex++, destinationIndex++)
{
- result[destinationIndex] =
convertForCqlWriter(getType(sourceIndex), values[sourceIndex]);
+ result[destinationIndex] =
convertForCqlWriter(getType(sourceIndex), values[sourceIndex], false);
}
return result;
}
- private Object convertForCqlWriter(CqlField.CqlType type, Object value)
+ private Object convertForCqlWriter(CqlField.CqlType type, Object
value, boolean isCollectionElement)
{
- return type.convertForCqlWriter(value, version);
+ return type.convertForCqlWriter(value, version,
isCollectionElement);
}
public CqlField.CqlType getType(int position)
diff --git
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/DateTypeTests.java
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/DateTypeTests.java
index 73de41f..d4e72a0 100644
---
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/DateTypeTests.java
+++
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/DateTypeTests.java
@@ -49,7 +49,7 @@ public class DateTypeTests
assertEquals(2021, end.getYear());
assertEquals(7, end.getMonthValue());
assertEquals(16, end.getDayOfMonth());
- Object cqlWriterObj = Date.INSTANCE.convertForCqlWriter(numDays,
BRIDGE.getVersion());
+ Object cqlWriterObj = Date.INSTANCE.convertForCqlWriter(numDays,
BRIDGE.getVersion(), false);
org.apache.cassandra.cql3.functions.types.LocalDate cqlWriterDate =
(org.apache.cassandra.cql3.functions.types.LocalDate) cqlWriterObj;
assertEquals(2021, cqlWriterDate.getYear());
assertEquals(7, cqlWriterDate.getMonth());
diff --git
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/PartitionKeyTests.java
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/PartitionKeyTests.java
index 9a1aabe..2373434 100644
---
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/PartitionKeyTests.java
+++
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/PartitionKeyTests.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
import org.apache.cassandra.bridge.CassandraBridgeImplementation;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.CqlType;
import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter;
@@ -49,24 +50,26 @@ public class PartitionKeyTests
@SuppressWarnings("static-access")
public void testBuildPartitionKey()
{
-
qt().forAll(arbitrary().pick(BRIDGE.supportedTypes())).checkAssert(partitionKeyType
-> {
- CqlTable table = TestSchema.builder(BRIDGE)
- .withPartitionKey("a", partitionKeyType)
- .withClusteringKey("b", BRIDGE.aInt())
- .withColumn("c", BRIDGE.aInt())
- .build()
- .buildTable();
- Object value = partitionKeyType.randomValue(100);
- String string = ((CqlType)
partitionKeyType).serializer().toString(value);
- ByteBuffer buffer = BRIDGE.buildPartitionKey(table,
Collections.singletonList(string));
- Object cassandraValue =
partitionKeyType.deserializeToJavaType(buffer);
+ qt().forAll(arbitrary().pick(BRIDGE.supportedTypes()))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
+ .checkAssert(partitionKeyType -> {
+ CqlTable table = TestSchema.builder(BRIDGE)
+ .withPartitionKey("a",
partitionKeyType)
+ .withClusteringKey("b",
BRIDGE.aInt())
+ .withColumn("c", BRIDGE.aInt())
+ .build()
+ .buildTable();
+ Object value = partitionKeyType.randomValue(100);
+ String string = ((CqlType)
partitionKeyType).serializer().toString(value);
+ ByteBuffer buffer = BRIDGE.buildPartitionKey(table,
Collections.singletonList(string));
+ Object cassandraValue =
partitionKeyType.deserializeToJavaType(buffer);
- // compare using Cassandra types
- assertTrue(ComparisonUtils.equals(value, cassandraValue));
+ // compare using Cassandra types
+ assertTrue(ComparisonUtils.equals(value, cassandraValue));
- // convert SparkSQL types back into test row types to compare
- Object sparkSqlValue = TYPE_CONVERTER.convert(partitionKeyType,
cassandraValue, false);
- assertTrue(ComparisonUtils.equals(value,
TYPE_CONVERTER.toTestRowType(partitionKeyType, sparkSqlValue)));
+ // convert SparkSQL types back into test row types to compare
+ Object sparkSqlValue =
TYPE_CONVERTER.convert(partitionKeyType, cassandraValue, false);
+ assertTrue(ComparisonUtils.equals(value,
TYPE_CONVERTER.toTestRowType(partitionKeyType, sparkSqlValue)));
});
}
@@ -74,30 +77,32 @@ public class PartitionKeyTests
@SuppressWarnings("static-access")
public void testBuildCompositePartitionKey()
{
-
qt().forAll(arbitrary().pick(BRIDGE.supportedTypes())).checkAssert(partitionKeyType
-> {
- CqlTable table = TestSchema.builder(BRIDGE)
- .withPartitionKey("a", BRIDGE.aInt())
- .withPartitionKey("b", partitionKeyType)
- .withPartitionKey("c", BRIDGE.text())
- .withClusteringKey("d", BRIDGE.aInt())
- .withColumn("e", BRIDGE.aInt())
- .build()
- .buildTable();
- List<AbstractType<?>> partitionKeyColumnTypes =
BRIDGE.partitionKeyColumnTypes(table);
- CompositeType compositeType =
CompositeType.getInstance(partitionKeyColumnTypes);
+ qt().forAll(arbitrary().pick(BRIDGE.supportedTypes()))
+ .assuming(CqlField.CqlType::supportedAsPrimaryKeyColumn)
+ .checkAssert(partitionKeyType -> {
+ CqlTable table = TestSchema.builder(BRIDGE)
+ .withPartitionKey("a",
BRIDGE.aInt())
+ .withPartitionKey("b",
partitionKeyType)
+ .withPartitionKey("c",
BRIDGE.text())
+ .withClusteringKey("d",
BRIDGE.aInt())
+ .withColumn("e", BRIDGE.aInt())
+ .build()
+ .buildTable();
+ List<AbstractType<?>> partitionKeyColumnTypes =
BRIDGE.partitionKeyColumnTypes(table);
+ CompositeType compositeType =
CompositeType.getInstance(partitionKeyColumnTypes);
- int columnA = (int) BRIDGE.aInt().randomValue(1024);
- Object columnB = partitionKeyType.randomValue(1024);
- String columnBString = ((CqlType)
partitionKeyType).serializer().toString(columnB);
- String columnC = (String) BRIDGE.text().randomValue(1024);
+ int columnA = (int) BRIDGE.aInt().randomValue(1024);
+ Object columnB = partitionKeyType.randomValue(1024);
+ String columnBString = ((CqlType)
partitionKeyType).serializer().toString(columnB);
+ String columnC = (String) BRIDGE.text().randomValue(1024);
- ByteBuffer buffer = BRIDGE.buildPartitionKey(table,
Arrays.asList(Integer.toString(columnA), columnBString, columnC));
- ByteBuffer[] buffers = compositeType.split(buffer);
- assertEquals(3, buffers.length);
+ ByteBuffer buffer = BRIDGE.buildPartitionKey(table,
Arrays.asList(Integer.toString(columnA), columnBString, columnC));
+ ByteBuffer[] buffers = compositeType.split(buffer);
+ assertEquals(3, buffers.length);
- assertEquals(columnA, buffers[0].getInt());
- assertEquals(columnB,
partitionKeyType.deserializeToJavaType(buffers[1]));
- assertEquals(columnC,
TYPE_CONVERTER.toSparkType(BRIDGE.text()).toTestRowType(BRIDGE.text().deserializeToJavaType(buffers[2])));
+ assertEquals(columnA, buffers[0].getInt());
+ assertEquals(columnB,
partitionKeyType.deserializeToJavaType(buffers[1]));
+ assertEquals(columnC,
TYPE_CONVERTER.toSparkType(BRIDGE.text()).toTestRowType(BRIDGE.text().deserializeToJavaType(buffers[2])));
});
}
}
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
index 8c59e64..a3117f4 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/NativeType.java
@@ -54,7 +54,7 @@ public abstract class NativeType extends CqlType implements
CqlField.NativeType
}
@Override
- public Object convertForCqlWriter(Object value, CassandraVersion version)
+ public Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement)
{
return value;
}
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
index fd7928f..dd14ee3 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlFrozen.java
@@ -129,9 +129,9 @@ public class CqlFrozen extends CqlType implements
CqlField.CqlFrozen
}
@Override
- public Object convertForCqlWriter(Object value, CassandraVersion version)
+ public Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement)
{
- return inner.convertForCqlWriter(value, version);
+ return inner.convertForCqlWriter(value, version, false);
}
@Override
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
index 6a47000..f04c1c2 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlList.java
@@ -93,10 +93,10 @@ public class CqlList extends CqlCollection implements
CqlField.CqlList
}
@Override
- public Object convertForCqlWriter(Object value, CassandraVersion version)
+ public Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement)
{
return ((List<?>) value).stream()
- .map(element ->
type().convertForCqlWriter(element, version))
+ .map(element ->
type().convertForCqlWriter(element, version, true))
.collect(Collectors.toList());
}
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
index 39edf62..6d0e49d 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlMap.java
@@ -107,12 +107,12 @@ public class CqlMap extends CqlCollection implements
CqlField.CqlMap
@Override
@SuppressWarnings("unchecked")
- public Object convertForCqlWriter(Object value, CassandraVersion version)
+ public Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement)
{
Map<Object, Object> map = (Map<Object, Object>) value;
return map.entrySet().stream()
- .collect(Collectors.toMap(element ->
keyType().convertForCqlWriter(element.getKey(), version),
- element ->
valueType().convertForCqlWriter(element.getValue(), version)));
+ .collect(Collectors.toMap(element ->
keyType().convertForCqlWriter(element.getKey(), version, true),
+ element ->
valueType().convertForCqlWriter(element.getValue(), version, true)));
}
@Override
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
index 84f649d..c50b805 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlSet.java
@@ -92,10 +92,10 @@ public class CqlSet extends CqlList implements
CqlField.CqlSet
}
@Override
- public Object convertForCqlWriter(Object value, CassandraVersion version)
+ public Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement)
{
return ((Set<?>) value).stream()
- .map(element ->
type().convertForCqlWriter(element, version))
+ .map(element ->
type().convertForCqlWriter(element, version, true))
.collect(Collectors.toSet());
}
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
index 32f3bab..28986a1 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlTuple.java
@@ -150,7 +150,7 @@ public class CqlTuple extends CqlCollection implements
CqlField.CqlTuple
}
@Override
- public Object convertForCqlWriter(Object value, CassandraVersion version)
+ public Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement)
{
return toTupleValue(version, this, value);
}
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
index 8477c2b..31a8187 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlUdt.java
@@ -113,7 +113,7 @@ public class CqlUdt extends CqlType implements
CqlField.CqlUdt
}
@Override
- public Object convertForCqlWriter(@NotNull Object value, CassandraVersion
version)
+ public Object convertForCqlWriter(@NotNull Object value, CassandraVersion
version, boolean isCollectionElement)
{
if (value instanceof UDTValue)
{
@@ -419,7 +419,7 @@ public class CqlUdt extends CqlType implements
CqlField.CqlUdt
int position,
@Nullable Object value)
{
- type.setInnerValue(udtValue, position, value == null ? null :
type.convertForCqlWriter(value, version));
+ type.setInnerValue(udtValue, position, value == null ? null :
type.convertForCqlWriter(value, version, false));
}
public static UserType toUserType(CqlUdt udt)
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
index 0f0044e..e8d9978 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Date.java
@@ -62,7 +62,7 @@ public class Date extends NativeType
}
@Override
- public Object convertForCqlWriter(Object value, CassandraVersion version)
+ public Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement)
{
// Cassandra 4.0 no longer allows writing date types as Integers in
CqlWriter,
// so we need to convert to LocalDate before writing in tests
diff --git
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
index 1e8ca98..264f8fa 100644
---
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
+++
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/data/types/Duration.java
@@ -19,7 +19,22 @@
package org.apache.cassandra.spark.data.types;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.bridge.type.InternalDuration;
+import org.apache.cassandra.cql3.functions.types.DataType;
+import org.apache.cassandra.cql3.functions.types.SettableByIndexData;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.DurationType;
+import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.serializers.TypeSerializer;
import org.apache.cassandra.spark.data.NativeType;
+import org.apache.cassandra.spark.utils.RandomUtils;
+import org.jetbrains.annotations.NotNull;
public class Duration extends NativeType
{
@@ -32,8 +47,141 @@ public class Duration extends NativeType
}
@Override
- public boolean isSupported()
+ public AbstractType<?> dataType()
+ {
+ return DurationType.instance;
+ }
+
+ public <T> TypeSerializer<T> serializer()
+ {
+ return (TypeSerializer<T>) AnalyticsDurationSerializer.INSTANCE;
+ }
+
+ @Override
+ public DataType driverDataType(boolean isFrozen)
+ {
+ return DataType.duration();
+ }
+
+ @Override
+ public boolean supportedAsPrimaryKeyColumn()
{
return false;
}
+
+ @Override
+ public boolean supportedAsMapKey()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean supportedAsSetElement()
+ {
+ return false;
+ }
+
+ @Override
+ public Object convertForCqlWriter(Object value, CassandraVersion version,
boolean isCollectionElement)
+ {
+ InternalDuration duration = (InternalDuration) value;
+ return isCollectionElement
+ ? AnalyticsDurationSerializer.toCql3FunctionDuration(duration)
+ : AnalyticsDurationSerializer.toCql3Duration(duration);
+ }
+
+ @Override
+ protected void setInnerValueInternal(SettableByIndexData<?> udtValue, int
position, @NotNull Object value)
+ {
+ org.apache.cassandra.cql3.functions.types.Duration duration = null;
+ if (value instanceof InternalDuration)
+ {
+ duration =
AnalyticsDurationSerializer.toCql3FunctionDuration((InternalDuration) value);
+ }
+ else if (value instanceof org.apache.cassandra.cql3.Duration)
+ {
+ duration =
AnalyticsDurationSerializer.toCql3FunctionDuration((org.apache.cassandra.cql3.Duration)
value);
+ }
+ else
+ {
+ duration = (org.apache.cassandra.cql3.functions.types.Duration)
value;
+ }
+ udtValue.set(position, duration,
org.apache.cassandra.cql3.functions.types.Duration.class);
+ }
+
+ @Override
+ public Object randomValue(int minCollectionSize)
+ {
+ return new InternalDuration(RandomUtils.randomPositiveInt(100),
+ RandomUtils.randomPositiveInt(100),
+
TimeUnit.MICROSECONDS.toNanos(RandomUtils.randomPositiveInt(1000000)));
+ }
+
+ /**
+ * Serializes Spark {@code CalendarInterval} type as CQL {@link
org.apache.cassandra.cql3.Duration}.
+ * Implementation note: {@code TypeSerializer<Object>} is used to prevent
class cast exception from
+ * {@code CalendarInterval}, which is not direct dependency of this module.
+ */
+ private static class AnalyticsDurationSerializer extends
TypeSerializer<InternalDuration>
+ {
+ private static final AnalyticsDurationSerializer INSTANCE = new
AnalyticsDurationSerializer();
+
+ @Override
+ public ByteBuffer serialize(InternalDuration value)
+ {
+ org.apache.cassandra.cql3.Duration cqlDuration =
toCql3Duration(value);
+ return
org.apache.cassandra.serializers.DurationSerializer.instance.serialize(cqlDuration);
+ }
+
+ public <V> InternalDuration deserialize(V v, ValueAccessor<V>
valueAccessor)
+ {
+ org.apache.cassandra.cql3.Duration cqlDuration =
org.apache.cassandra.serializers.DurationSerializer.instance.deserialize(v,
valueAccessor);
+ return fromCql3Duration(cqlDuration);
+ }
+
+ public <V> void validate(V v, ValueAccessor<V> valueAccessor) throws
MarshalException
+ {
+
org.apache.cassandra.serializers.DurationSerializer.instance.validate(v,
valueAccessor);
+ }
+
+ public String toString(InternalDuration duration)
+ {
+ return duration == null ? "" : duration.toString();
+ }
+
+ public Class<InternalDuration> getType()
+ {
+ return InternalDuration.class;
+ }
+
+ public static org.apache.cassandra.cql3.Duration
toCql3Duration(InternalDuration duration)
+ {
+ return nullOrConvert(duration, d ->
org.apache.cassandra.cql3.Duration.newInstance(d.months, d.days,
d.nanoseconds));
+ }
+
+ public static org.apache.cassandra.cql3.functions.types.Duration
toCql3FunctionDuration(InternalDuration duration)
+ {
+ return nullOrConvert(duration, d ->
org.apache.cassandra.cql3.functions.types.Duration.newInstance(d.months,
d.days, d.nanoseconds));
+ }
+
+ public static org.apache.cassandra.cql3.functions.types.Duration
toCql3FunctionDuration(org.apache.cassandra.cql3.Duration duration)
+ {
+ return nullOrConvert(duration, d ->
org.apache.cassandra.cql3.functions.types.Duration.newInstance(d.getMonths(),
d.getDays(), d.getNanoseconds()));
+ }
+
+ public static InternalDuration
fromCql3Duration(org.apache.cassandra.cql3.Duration duration)
+ {
+ return nullOrConvert(duration, d -> new
InternalDuration(d.getMonths(), d.getDays(), d.getNanoseconds()));
+ }
+
+ private static <I, O> O nullOrConvert(I input, Function<I, O>
converter)
+ {
+ if (input == null)
+ {
+ return null;
+ }
+
+ return converter.apply(input);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]