This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8dce35f  CASSANDRA-19716: Invalid mapping when timestamp is used as a 
partition key during bulk writes (#60)
8dce35f is described below

commit 8dce35f1cb3c204be669548ee286055b12e67fe9
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Thu Jun 20 14:24:21 2024 -0700

    CASSANDRA-19716: Invalid mapping when timestamp is used as a partition key 
during bulk writes (#60)
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19716
---
 CHANGES.txt                                        |   1 +
 .../spark/bulkwriter/CqlTableInfoProvider.java     |   2 +-
 .../cassandra/spark/common/schema/BooleanType.java |   6 -
 .../cassandra/spark/common/schema/BytesType.java   |   6 -
 .../spark/common/schema/CollectionType.java        |   7 +-
 .../cassandra/spark/common/schema/ColumnType.java  |   7 -
 .../cassandra/spark/common/schema/ColumnTypes.java |  68 +-----
 .../cassandra/spark/common/schema/DoubleType.java  |   6 -
 .../cassandra/spark/common/schema/IntegerType.java |  13 +-
 .../cassandra/spark/common/schema/LongType.java    |  12 +-
 .../cassandra/spark/common/schema/StringType.java  |  14 +-
 .../spark/common/schema/StringUuidType.java        |   6 -
 .../schema/{ColumnType.java => TimestampType.java} |  34 +--
 .../cassandra/spark/common/schema/UuidType.java    |   6 -
 .../distributed/impl/CassandraCluster.java         |   1 -
 .../analytics/PartitionKeyIntegrationTest.java     | 249 +++++++++++++++++++++
 .../apache/cassandra/analytics/SparkTestUtils.java |  29 ++-
 17 files changed, 308 insertions(+), 159 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4c16a54..2e654ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Invalid mapping when timestamp is used as a partition key during bulk 
writes (CASSANDRA-19716)
  * NullPointerException when reading static column with null values 
(CASSANDRA-19626)
  * Integrate with the latest sidecar client (CASSANDRA-19616)
  * Support bulk write via S3 (CASSANDRA-19563)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java
index 29977cf..981c24c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java
@@ -68,7 +68,7 @@ public class CqlTableInfoProvider implements TableInfoProvider
                                                                              
.put(INT, ColumnTypes.INT)
                                                                              
.put(BOOLEAN, ColumnTypes.BOOLEAN)
                                                                              
.put(TEXT, ColumnTypes.STRING)
-                                                                             
.put(TIMESTAMP, ColumnTypes.LONG)
+                                                                             
.put(TIMESTAMP, ColumnTypes.TIMESTAMP)
                                                                              
.put(UUID, ColumnTypes.UUID)
                                                                              
.put(VARCHAR, ColumnTypes.STRING)
                                                                              
.put(ASCII, ColumnTypes.STRING)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BooleanType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BooleanType.java
index 8bfb555..5b998fd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BooleanType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BooleanType.java
@@ -37,12 +37,6 @@ public class BooleanType implements ColumnType<Boolean>
         return getBoolean(ByteBuffer.wrap(value));
     }
 
-    @Override
-    public Boolean getDefault()
-    {
-        return false;
-    }
-
     @Override
     public ByteBuffer serialize(Boolean value)
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BytesType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BytesType.java
index e4b03b3..48fd87d 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BytesType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BytesType.java
@@ -32,12 +32,6 @@ public class BytesType implements ColumnType<ByteBuffer>
         return ByteBuffer.wrap(value);
     }
 
-    @Override
-    public ByteBuffer getDefault()
-    {
-        return ByteBuffer.wrap(new byte[0]);
-    }
-
     @Override
     public ByteBuffer serialize(ByteBuffer value)
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/CollectionType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/CollectionType.java
index a786ef3..62a735f 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/CollectionType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/CollectionType.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.spark.common.schema;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 public abstract class CollectionType<EntryType, IntermediateType> implements 
ColumnType<Collection<EntryType>>, Serializable
@@ -35,13 +34,9 @@ public abstract class CollectionType<EntryType, 
IntermediateType> implements Col
 
     public abstract IntermediateType parseCollectionColumn(ByteBuffer 
colNameSuffix, ByteBuffer colValue);
 
-    public Collection<EntryType> getDefault()
-    {
-        return Collections.emptyList();
-    }
-
     public abstract List<EntryType> finaliseCollection(List<IntermediateType> 
entryList);
 
+    @Override
     public ByteBuffer serialize(Collection<EntryType> value)
     {
         throw new UnsupportedOperationException("Doesn't make much sense");
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java
index 9a39c59..7eccc8b 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java
@@ -34,13 +34,6 @@ public interface ColumnType<T> extends Serializable
      */
     T parseColumn(ByteBuffer buffer, int length);
 
-    /**
-     * Default value, in case column value doesn't exist for CQL row
-     *
-     * @return the default value for the column type
-     */
-    T getDefault();
-
     /**
      * Serialize into ByteBuffer and keeps the position at beginning of 
ByteBuffer
      *
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnTypes.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnTypes.java
index a6d35d8..e6f5cae 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnTypes.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnTypes.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.common.schema;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Date;
 
 @SuppressWarnings("unused")  // Extra types can become useful in the future
 public final class ColumnTypes implements Serializable
@@ -34,72 +35,7 @@ public final class ColumnTypes implements Serializable
     public static final ColumnType<java.util.UUID> UUID = new UuidType();
     public static final ColumnType<Double> DOUBLE = new DoubleType();
     public static final ColumnType<Boolean> BOOLEAN = new BooleanType();
-
-    // Nullable Types
-    public static final ColumnType<String> NULLABLE_STRING = new StringType()
-    {
-        @Override
-        public String getDefault()
-        {
-            return null;
-        }
-    };
-    public static final ColumnType<Integer> NULLABLE_INT = new IntegerType()
-    {
-        @Override
-        public Integer getDefault()
-        {
-            return null;
-        }
-    };
-    public static final ColumnType<Long> NULLABLE_LONG = new LongType()
-    {
-        @Override
-        public Long getDefault()
-        {
-            return null;
-        }
-    };
-    public static final ColumnType<ByteBuffer> NULLABLE_BYTES = new BytesType()
-    {
-        @Override
-        public ByteBuffer getDefault()
-        {
-            return null;
-        }
-    };
-    public static final ColumnType<String> NULLABLE_STRING_UUID = new 
StringUuidType()
-    {
-        @Override
-        public String getDefault()
-        {
-            return null;
-        }
-    };
-    public static final ColumnType<java.util.UUID> NULLABLE_UUID = new 
UuidType()
-    {
-        @Override
-        public java.util.UUID getDefault()
-        {
-            return null;
-        }
-    };
-    public static final ColumnType<Double> NULLABLE_DOUBLE = new DoubleType()
-    {
-        @Override
-        public Double getDefault()
-        {
-            return null;
-        }
-    };
-    public static final ColumnType<Boolean> NULLABLE_BOOLEAN = new 
BooleanType()
-    {
-        @Override
-        public Boolean getDefault()
-        {
-            return null;
-        }
-    };
+    public static final ColumnType<Date> TIMESTAMP = new TimestampType();
 
     private ColumnTypes()
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/DoubleType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/DoubleType.java
index 22b78bb..f59d792 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/DoubleType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/DoubleType.java
@@ -36,12 +36,6 @@ public class DoubleType implements ColumnType<Double>
         return getDouble(ByteBuffer.wrap(value));
     }
 
-    @Override
-    public Double getDefault()
-    {
-        return 0d;
-    }
-
     @Override
     public ByteBuffer serialize(Double value)
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/IntegerType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/IntegerType.java
index 5c7f113..d3688c5 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/IntegerType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/IntegerType.java
@@ -23,22 +23,19 @@ import java.nio.ByteBuffer;
 
 public class IntegerType implements ColumnType<Integer>
 {
+
+    public static final int TYPE_SIZE = Integer.SIZE / Byte.SIZE;
+
     @Override
     public Integer parseColumn(ByteBuffer buffer, int length)
     {
-        assert length == Integer.SIZE / Byte.SIZE;
+        assert length == TYPE_SIZE;
         return buffer.getInt();
     }
 
-    @Override
-    public Integer getDefault()
-    {
-        return 0;
-    }
-
     @Override
     public ByteBuffer serialize(Integer value)
     {
-        return ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(0, value);
+        return ByteBuffer.allocate(TYPE_SIZE).putInt(0, value);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/LongType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/LongType.java
index 25ec619..4b9a1db 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/LongType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/LongType.java
@@ -23,22 +23,18 @@ import java.nio.ByteBuffer;
 
 public class LongType implements ColumnType<Long>
 {
+    public static final int TYPE_SIZE = Long.SIZE / Byte.SIZE;
+
     @Override
     public Long parseColumn(ByteBuffer buffer, int length)
     {
-        assert length == Long.SIZE / Byte.SIZE;
+        assert length == TYPE_SIZE;
         return buffer.getLong();
     }
 
-    @Override
-    public Long getDefault()
-    {
-        return 0L;
-    }
-
     @Override
     public ByteBuffer serialize(Long value)
     {
-        return ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(0, value);
+        return ByteBuffer.allocate(TYPE_SIZE).putLong(0, value);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringType.java
index 9e2d228..d8daee9 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringType.java
@@ -20,29 +20,21 @@
 package org.apache.cassandra.spark.common.schema;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 
 public class StringType implements ColumnType<String>
 {
-    private static final Charset UTF_8 = Charset.forName("UTF-8");
-
     @Override
     public String parseColumn(ByteBuffer buffer, int length)
     {
         byte[] value = new byte[length];
         buffer.get(value, 0, length);
-        return new String(value, UTF_8);
-    }
-
-    @Override
-    public String getDefault()
-    {
-        return "";
+        return new String(value, StandardCharsets.UTF_8);
     }
 
     @Override
     public ByteBuffer serialize(String value)
     {
-        return ByteBuffer.wrap(value.getBytes(UTF_8));
+        return ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8));
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringUuidType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringUuidType.java
index 9253cd6..9b25ecc 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringUuidType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringUuidType.java
@@ -30,12 +30,6 @@ public class StringUuidType implements ColumnType<String>
         return ColumnTypes.UUID.parseColumn(buffer, length).toString();
     }
 
-    @Override
-    public String getDefault()
-    {
-        return "";
-    }
-
     @Override
     public ByteBuffer serialize(String valueStr)
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/TimestampType.java
similarity index 64%
copy from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java
copy to 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/TimestampType.java
index 9a39c59..1dface3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/TimestampType.java
@@ -19,33 +19,41 @@
 
 package org.apache.cassandra.spark.common.schema;
 
-import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Date;
 
-public interface ColumnType<T> extends Serializable
+/**
+ * Provides functionality to convert {@link ByteBuffer}s to a {@link Date} 
column type and to serialize
+ * {@link Date} types to {@link ByteBuffer}s
+ */
+public class TimestampType implements ColumnType<Date>
 {
+    public static final int TYPE_SIZE = Long.SIZE / Byte.SIZE;
+
     /**
      * Parses a value of this type from buffer. Value will be parsed from 
current position of the buffer. After
      * completion of the function, position will be moved by "length" bytes.
      *
      * @param buffer Buffer to parse column from
      * @param length Serialized value size in buffer is as big as length
-     * @return value as Java type
-     */
-    T parseColumn(ByteBuffer buffer, int length);
-
-    /**
-     * Default value, in case column value doesn't exist for CQL row
-     *
-     * @return the default value for the column type
+     * @return value as {@link Date} type
      */
-    T getDefault();
+    @Override
+    public Date parseColumn(ByteBuffer buffer, int length)
+    {
+        assert length == TYPE_SIZE;
+        return new Date(buffer.getLong());
+    }
 
     /**
-     * Serialize into ByteBuffer and keeps the position at beginning of 
ByteBuffer
+     * Serialize {@link Date} into ByteBuffer and keeps the position at 
beginning of ByteBuffer
      *
      * @param value the value to serialize
      * @return A ByteBuffer containing the serialized value
      */
-    ByteBuffer serialize(T value);
+    @Override
+    public ByteBuffer serialize(Date value)
+    {
+        return ByteBuffer.allocate(TYPE_SIZE).putLong(0, value.getTime());
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/UuidType.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/UuidType.java
index f6b4b82..daf27b3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/UuidType.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/UuidType.java
@@ -42,12 +42,6 @@ public class UuidType implements ColumnType<UUID>
         return parseUUID(buffer, length);
     }
 
-    @Override
-    public UUID getDefault()
-    {
-        return UUID.randomUUID();
-    }
-
     @Override
     public ByteBuffer serialize(UUID value)
     {
diff --git 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
index a339be2..09c5e47 100644
--- 
a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
+++ 
b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java
@@ -103,7 +103,6 @@ public class CassandraCluster<I extends IInstance> 
implements IClusterExtension<
                       .withConfig(config -> 
configuration.features.forEach(config::with))
                       .withTokenSupplier(tokenSupplier);
 
-
         if (dcCount > 1)
         {
             clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyIntegrationTest.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyIntegrationTest.java
new file mode 100644
index 0000000..553d536
--- /dev/null
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyIntegrationTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.analytics;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.sidecar.testing.QualifiedName;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static 
org.apache.cassandra.analytics.SparkTestUtils.VALIDATION_DEFAULT_COLUMNS_MAPPER;
+import static 
org.apache.cassandra.analytics.SparkTestUtils.VALIDATION_DEFAULT_ROW_MAPPER;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.ASCII;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BIGINT;
+import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BLOB;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BOOLEAN;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DOUBLE;
+import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
+import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TEXT;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TIMESTAMP;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TIMEUUID;
+import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.UUID;
+import static 
org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.VARCHAR;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test that performs bulk reads and writes using different types 
for the partition key to test
+ * type mapping for the partition key
+ */
+class PartitionKeyIntegrationTest extends SharedClusterSparkIntegrationTestBase
+{
+    static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", 
"f", "g");
+    static final List<String> REDUCED_DATASET_FOR_BOOLEAN = Arrays.asList("a", 
"b");
+    static final String CREATE_TEST_TABLE_SINGLE_PK_STATEMENT =
+    "CREATE TABLE IF NOT EXISTS %s (id %s, course text, marks int, PRIMARY KEY 
(id)) WITH read_repair='NONE';";
+    static final String CREATE_TEST_TABLE_COMPOSITE_PK_STATEMENT =
+    "CREATE TABLE IF NOT EXISTS %s (id %s, course %s, marks int, PRIMARY KEY 
((id,course))) WITH read_repair='NONE';";
+
+    // Reference timestamp
+    static final long CURRENT_TIMESTAMP = System.currentTimeMillis();
+
+    @ParameterizedTest(name = "CQL Type for Partition Key={0}")
+    @MethodSource("mappings")
+    void allTypesInSinglePartitionKey(String typeName,
+                                      Function<Integer, String> ignored,
+                                      Function<Object[], String> columnsMapper,
+                                      Function<Row, String> rowMapper)
+    {
+        QualifiedName sourceTableName = new QualifiedName(TEST_KEYSPACE, 
typeName + "_source");
+        QualifiedName targetTableName = new QualifiedName(TEST_KEYSPACE, 
typeName + "_target");
+
+        Dataset<Row> data = bulkReaderDataFrame(sourceTableName).load();
+        if (typeName.equals(BOOLEAN))
+        {
+            
assertThat(data.count()).isEqualTo(REDUCED_DATASET_FOR_BOOLEAN.size());
+        }
+        else
+        {
+            assertThat(data.count()).isEqualTo(DATASET.size());
+        }
+        List<Row> rowList = data.collectAsList().stream()
+                                .sorted(Comparator.comparing(row -> 
row.getInt(2)))
+                                .collect(Collectors.toList());
+
+        bulkWriterDataFrameWriter(data, targetTableName).save();
+        sparkTestUtils.validateWrites(rowList, queryAllData(targetTableName), 
columnsMapper, rowMapper);
+    }
+
+    @ParameterizedTest(name = "CQL Type for Composite Partition Key={0}-{0}")
+    @MethodSource("mappings")
+    void allTypesInCompositePartitionKey(String typeName,
+                                         Function<Integer, String> ignored,
+                                         Function<Object[], String> 
columnsMapper,
+                                         Function<Row, String> rowMapper)
+    {
+        QualifiedName compositeSourceTableName = new 
QualifiedName(TEST_KEYSPACE, typeName + "_composite_source");
+        QualifiedName compositeTargetTableName = new 
QualifiedName(TEST_KEYSPACE, typeName + "_composite_target");
+
+        Dataset<Row> data = 
bulkReaderDataFrame(compositeSourceTableName).load();
+        if (typeName.equals(BOOLEAN))
+        {
+            
assertThat(data.count()).isEqualTo(REDUCED_DATASET_FOR_BOOLEAN.size());
+        }
+        else
+        {
+            assertThat(data.count()).isEqualTo(DATASET.size());
+        }
+        List<Row> rowList = data.collectAsList().stream()
+                                .sorted(Comparator.comparing(row -> 
row.getInt(2)))
+                                .collect(Collectors.toList());
+
+        bulkWriterDataFrameWriter(data, compositeTargetTableName).save();
+        sparkTestUtils.validateWrites(rowList, 
queryAllData(compositeTargetTableName), columnsMapper, rowMapper);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        createTestKeyspace(TEST_KEYSPACE, DC1_RF1);
+        mappings().forEach(arguments -> {
+            Object[] args = arguments.get();
+            Object typeName = args[0];
+            QualifiedName sourceTableName = new QualifiedName(TEST_KEYSPACE, 
typeName + "_source");
+            QualifiedName targetTableName = new QualifiedName(TEST_KEYSPACE, 
typeName + "_target");
+            createTestTable(sourceTableName, 
String.format(CREATE_TEST_TABLE_SINGLE_PK_STATEMENT, "%s", typeName));
+            createTestTable(targetTableName, 
String.format(CREATE_TEST_TABLE_SINGLE_PK_STATEMENT, "%s", typeName));
+
+            QualifiedName compositeSourceTableName = new 
QualifiedName(TEST_KEYSPACE, typeName + "_composite_source");
+            QualifiedName compositeTargetTableName = new 
QualifiedName(TEST_KEYSPACE, typeName + "_composite_target");
+
+            createTestTable(compositeSourceTableName, 
String.format(CREATE_TEST_TABLE_COMPOSITE_PK_STATEMENT, "%s", typeName, 
typeName));
+            createTestTable(compositeTargetTableName, 
String.format(CREATE_TEST_TABLE_COMPOSITE_PK_STATEMENT, "%s", typeName, 
typeName));
+
+            Function<Integer, String> typeToRowValueFn = (Function<Integer, 
String>) args[1];
+            if (typeName.equals(BOOLEAN))
+            {
+                populateTable(sourceTableName, REDUCED_DATASET_FOR_BOOLEAN, 
typeToRowValueFn);
+                populateCompositePKTable(compositeSourceTableName, 
REDUCED_DATASET_FOR_BOOLEAN, typeToRowValueFn);
+            }
+            else
+            {
+                populateTable(sourceTableName, DATASET, typeToRowValueFn);
+                populateCompositePKTable(compositeSourceTableName, DATASET, 
typeToRowValueFn);
+            }
+        });
+    }
+
+    void populateTable(QualifiedName tableName, List<String> values, 
Function<Integer, String> typeToRowValueFn)
+    {
+        ICoordinator coordinator = 
cluster.getFirstRunningInstance().coordinator();
+        for (int i = 0; i < values.size(); i++)
+        {
+            String value = values.get(i);
+            String query = String.format("INSERT INTO %s (id, course, marks) 
VALUES (%s,'%s',%d) ",
+                                         tableName, typeToRowValueFn.apply(i), 
"course_" + value, 80 + i);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+    }
+
+    void populateCompositePKTable(QualifiedName tableName, List<String> 
values, Function<Integer, String> typeToRowValueFn)
+    {
+        ICoordinator coordinator = 
cluster.getFirstRunningInstance().coordinator();
+        for (int i = 0; i < values.size(); i++)
+        {
+            String value = typeToRowValueFn.apply(i);
+            String query = String.format("INSERT INTO %s (id, course, marks) 
VALUES (%s,%s,%d) ",
+                                         tableName, value, value, 80 + i);
+            coordinator.execute(query, ConsistencyLevel.ALL);
+        }
+    }
+
+    Stream<Arguments> mappings()
+    {
+        return Stream.of(
+        Arguments.of(BIGINT, (Function<Integer, String>) String::valueOf,
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER),
+        Arguments.of(BLOB, (Function<Integer, String>) value -> 
String.format("bigintAsBlob(%d)", value),
+                     (Function<Object[], String>) ((Object[] columns) -> {
+                         Object col0 = new 
BigInteger(ByteBufferUtils.getArray((ByteBuffer) columns[0])).toString();
+                         if (columns[1] instanceof ByteBuffer)
+                         {
+                             Object col1 = new 
BigInteger(ByteBufferUtils.getArray((ByteBuffer) columns[1])).toString();
+                             return String.format("%s:%s:%s", col0, col1, 
columns[2]);
+                         }
+                         else
+                         {
+                             return String.format("%s:%s:%s", col0, 
columns[1], columns[2]);
+                         }
+                     }),
+                     (Function<Row, String>) row -> {
+                         byte[] col0 = (byte[]) row.get(0);
+                         Object col1 = row.get(1);
+
+                         if (col1 instanceof byte[])
+                         {
+                             return String.format("%s:%s:%d", new 
BigInteger(col0), new BigInteger((byte[]) col1), row.getInt(2));
+                         }
+                         else
+                         {
+                             return String.format("%s:%s:%d", new 
BigInteger(col0), col1, row.getInt(2));
+                         }
+                     }),
+        Arguments.of(DOUBLE, (Function<Integer, String>) String::valueOf,
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER),
+        Arguments.of(INT, (Function<Integer, String>) String::valueOf,
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER),
+        Arguments.of(BOOLEAN, (Function<Integer, String>) value -> value % 2 
== 0 ? "true" : "false",
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER),
+        Arguments.of(TEXT, (Function<Integer, String>) value -> 
String.format("'%d'", value),
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER),
+        Arguments.of(TIMESTAMP, (Function<Integer, String>) offset -> 
String.valueOf(CURRENT_TIMESTAMP + offset),
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER,
+                     (Function<Row, String>) row -> {
+                         Date col0 = new Date(row.getTimestamp(0).getTime());
+                         Object col1 = row.get(1);
+                         if (col1 instanceof Timestamp)
+                         {
+                             return String.format("%s:%s:%d", col0, new 
Date(((Timestamp) col1).getTime()), row.getInt(2));
+                         }
+                         return String.format("%s:%s:%d", col0, row.get(1), 
row.getInt(2));
+                     }),
+        Arguments.of(UUID, (Function<Integer, String>) value -> new 
java.util.UUID(0, value).toString(),
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER),
+        Arguments.of(VARCHAR, (Function<Integer, String>) value -> 
String.format("'%d'", value),
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER),
+        Arguments.of(ASCII, (Function<Integer, String>) value -> 
String.format("'%d'", value),
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER),
+        Arguments.of(TIMEUUID, (Function<Integer, String>) offset -> 
UUIDs.startOf(CURRENT_TIMESTAMP + offset).toString(),
+                     VALIDATION_DEFAULT_COLUMNS_MAPPER, 
VALIDATION_DEFAULT_ROW_MAPPER)
+        );
+    }
+}
diff --git 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index e49ffdb..de14a2b 100644
--- 
a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ 
b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -47,6 +48,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
+import org.jetbrains.annotations.NotNull;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.catchThrowable;
@@ -56,6 +58,15 @@ import static org.assertj.core.api.Assertions.catchThrowable;
  */
 public class SparkTestUtils
 {
+    /**
+     * Maps a row queried from Cassandra, represented as an object array, and 
it produces a string representation
+     * to perform validation of data written by a bulk writer job
+     */
+    public static final Function<Object[], String> 
VALIDATION_DEFAULT_COLUMNS_MAPPER = columns -> String.format("%s:%s:%s", 
columns[0], columns[1], columns[2]);
+    /**
+     * Maps a {@link Row} to a string representation used to validate data 
written by a bulk writer job
+     */
+    public static final Function<Row, String> VALIDATION_DEFAULT_ROW_MAPPER = 
row -> String.format("%s:%s:%d", row.get(0), row.get(1), row.getInt(2));
     protected ICluster<? extends IInstance> cluster;
     protected DnsResolver dnsResolver;
     protected int sidecarPort;
@@ -166,13 +177,18 @@ public class SparkTestUtils
     }
 
     public void validateWrites(List<Row> sourceData, Object[][] queriedData)
+    {
+        validateWrites(sourceData, queriedData, 
VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER);
+    }
+
+    public void validateWrites(List<Row> sourceData,
+                               @NotNull Object[][] queriedData,
+                               @NotNull Function<Object[], String> 
columnsMapper,
+                               @NotNull Function<Row, String> rowMapper)
     {
         // build a set of entries read from Cassandra into a set
         Set<String> actualEntries = Arrays.stream(queriedData)
-                                          .map((Object[] columns) -> 
String.format("%s:%s:%s",
-                                                                               
    columns[0],
-                                                                               
    columns[1],
-                                                                               
    columns[2]))
+                                          .map(columnsMapper)
                                           .collect(Collectors.toSet());
 
         // Number of entries in Cassandra must match the original datasource
@@ -180,10 +196,7 @@ public class SparkTestUtils
 
         // remove from actual entries to make sure that the data read is the 
same as the data written
         sourceData.forEach(row -> {
-            String key = String.format("%d:%s:%d",
-                                       row.getInt(0),
-                                       row.getString(1),
-                                       row.getInt(2));
+            String key = rowMapper.apply(row);
             assertThat(actualEntries.remove(key)).as(key + " is expected to 
exist in the actual entries")
                                                  .isTrue();
         });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to