This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new 8dce35f CASSANDRA-19716: Invalid mapping when timestamp is used as a partition key during bulk writes (#60) 8dce35f is described below commit 8dce35f1cb3c204be669548ee286055b12e67fe9 Author: Francisco Guerrero <fran...@apache.org> AuthorDate: Thu Jun 20 14:24:21 2024 -0700 CASSANDRA-19716: Invalid mapping when timestamp is used as a partition key during bulk writes (#60) Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19716 --- CHANGES.txt | 1 + .../spark/bulkwriter/CqlTableInfoProvider.java | 2 +- .../cassandra/spark/common/schema/BooleanType.java | 6 - .../cassandra/spark/common/schema/BytesType.java | 6 - .../spark/common/schema/CollectionType.java | 7 +- .../cassandra/spark/common/schema/ColumnType.java | 7 - .../cassandra/spark/common/schema/ColumnTypes.java | 68 +----- .../cassandra/spark/common/schema/DoubleType.java | 6 - .../cassandra/spark/common/schema/IntegerType.java | 13 +- .../cassandra/spark/common/schema/LongType.java | 12 +- .../cassandra/spark/common/schema/StringType.java | 14 +- .../spark/common/schema/StringUuidType.java | 6 - .../schema/{ColumnType.java => TimestampType.java} | 34 +-- .../cassandra/spark/common/schema/UuidType.java | 6 - .../distributed/impl/CassandraCluster.java | 1 - .../analytics/PartitionKeyIntegrationTest.java | 249 +++++++++++++++++++++ .../apache/cassandra/analytics/SparkTestUtils.java | 29 ++- 17 files changed, 308 insertions(+), 159 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 4c16a54..2e654ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Invalid mapping when timestamp is used as a partition key during bulk writes (CASSANDRA-19716) * NullPointerException when reading static column with null values (CASSANDRA-19626) * Integrate with the latest sidecar client (CASSANDRA-19616) * Support bulk write via S3 (CASSANDRA-19563) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java index 29977cf..981c24c 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CqlTableInfoProvider.java @@ -68,7 +68,7 @@ public class CqlTableInfoProvider implements TableInfoProvider .put(INT, ColumnTypes.INT) .put(BOOLEAN, ColumnTypes.BOOLEAN) .put(TEXT, ColumnTypes.STRING) - .put(TIMESTAMP, ColumnTypes.LONG) + .put(TIMESTAMP, ColumnTypes.TIMESTAMP) .put(UUID, ColumnTypes.UUID) .put(VARCHAR, ColumnTypes.STRING) .put(ASCII, ColumnTypes.STRING) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BooleanType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BooleanType.java index 8bfb555..5b998fd 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BooleanType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BooleanType.java @@ -37,12 +37,6 @@ public class BooleanType implements ColumnType<Boolean> return getBoolean(ByteBuffer.wrap(value)); } - @Override - public Boolean getDefault() - { - return false; - } - @Override public ByteBuffer serialize(Boolean value) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BytesType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BytesType.java index e4b03b3..48fd87d 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BytesType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/BytesType.java @@ -32,12 +32,6 @@ public class BytesType implements ColumnType<ByteBuffer> return ByteBuffer.wrap(value); } - @Override - public ByteBuffer getDefault() - { - return ByteBuffer.wrap(new byte[0]); - } - @Override public ByteBuffer serialize(ByteBuffer value) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/CollectionType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/CollectionType.java index a786ef3..62a735f 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/CollectionType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/CollectionType.java @@ -22,7 +22,6 @@ package org.apache.cassandra.spark.common.schema; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Collection; -import java.util.Collections; import java.util.List; public abstract class CollectionType<EntryType, IntermediateType> implements ColumnType<Collection<EntryType>>, Serializable @@ -35,13 +34,9 @@ public abstract class CollectionType<EntryType, IntermediateType> implements Col public abstract IntermediateType parseCollectionColumn(ByteBuffer colNameSuffix, ByteBuffer colValue); - public Collection<EntryType> getDefault() - { - return Collections.emptyList(); - } - public abstract List<EntryType> finaliseCollection(List<IntermediateType> entryList); + @Override public ByteBuffer serialize(Collection<EntryType> value) { throw new UnsupportedOperationException("Doesn't make much sense"); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java index 9a39c59..7eccc8b 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java @@ -34,13 +34,6 @@ public interface ColumnType<T> extends Serializable */ T parseColumn(ByteBuffer buffer, int length); - /** - * Default value, in case column value doesn't exist for CQL row - * - * @return the default value for the column type - */ - T getDefault(); - /** * Serialize into ByteBuffer and keeps the position at beginning of ByteBuffer * diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnTypes.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnTypes.java index a6d35d8..e6f5cae 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnTypes.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnTypes.java @@ -21,6 +21,7 @@ package org.apache.cassandra.spark.common.schema; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Date; @SuppressWarnings("unused") // Extra types can become useful in the future public final class ColumnTypes implements Serializable @@ -34,72 +35,7 @@ public final class ColumnTypes implements Serializable public static final ColumnType<java.util.UUID> UUID = new UuidType(); public static final ColumnType<Double> DOUBLE = new DoubleType(); public static final ColumnType<Boolean> BOOLEAN = new BooleanType(); - - // Nullable Types - public static final ColumnType<String> NULLABLE_STRING = new StringType() - { - @Override - public String getDefault() - { - return null; - } - }; - public static final ColumnType<Integer> NULLABLE_INT = new IntegerType() - { - @Override - public Integer getDefault() - { - return null; - } - }; - public static final ColumnType<Long> NULLABLE_LONG = new LongType() - { - @Override - public Long getDefault() - { - return null; - } - }; - public static final ColumnType<ByteBuffer> NULLABLE_BYTES = new BytesType() - { - @Override - public ByteBuffer getDefault() - { - return null; - } - }; - public static final ColumnType<String> NULLABLE_STRING_UUID = new StringUuidType() - { - @Override - public String getDefault() - { - return null; - } - }; - public static final ColumnType<java.util.UUID> NULLABLE_UUID = new UuidType() - { - @Override - public java.util.UUID getDefault() - { - return null; - } - }; - public static final ColumnType<Double> NULLABLE_DOUBLE = new DoubleType() - { - @Override - public Double getDefault() - { - return null; - } - }; - public static final ColumnType<Boolean> NULLABLE_BOOLEAN = new BooleanType() - { - @Override - public Boolean getDefault() - { - return null; - } - }; + public static final ColumnType<Date> TIMESTAMP = new TimestampType(); private ColumnTypes() { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/DoubleType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/DoubleType.java index 22b78bb..f59d792 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/DoubleType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/DoubleType.java @@ -36,12 +36,6 @@ public class DoubleType implements ColumnType<Double> return getDouble(ByteBuffer.wrap(value)); } - @Override - public Double getDefault() - { - return 0d; - } - @Override public ByteBuffer serialize(Double value) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/IntegerType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/IntegerType.java index 5c7f113..d3688c5 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/IntegerType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/IntegerType.java @@ -23,22 +23,19 @@ import java.nio.ByteBuffer; public class IntegerType implements ColumnType<Integer> { + + public static final int TYPE_SIZE = Integer.SIZE / Byte.SIZE; + @Override public Integer parseColumn(ByteBuffer buffer, int length) { - assert length == Integer.SIZE / Byte.SIZE; + assert length == TYPE_SIZE; return buffer.getInt(); } - @Override - public Integer getDefault() - { - return 0; - } - @Override public ByteBuffer serialize(Integer value) { - return ByteBuffer.allocate(Integer.SIZE / Byte.SIZE).putInt(0, value); + return ByteBuffer.allocate(TYPE_SIZE).putInt(0, value); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/LongType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/LongType.java index 25ec619..4b9a1db 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/LongType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/LongType.java @@ -23,22 +23,18 @@ import java.nio.ByteBuffer; public class LongType implements ColumnType<Long> { + public static final int TYPE_SIZE = Long.SIZE / Byte.SIZE; + @Override public Long parseColumn(ByteBuffer buffer, int length) { - assert length == Long.SIZE / Byte.SIZE; + assert length == TYPE_SIZE; return buffer.getLong(); } - @Override - public Long getDefault() - { - return 0L; - } - @Override public ByteBuffer serialize(Long value) { - return ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(0, value); + return ByteBuffer.allocate(TYPE_SIZE).putLong(0, value); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringType.java index 9e2d228..d8daee9 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringType.java @@ -20,29 +20,21 @@ package org.apache.cassandra.spark.common.schema; import java.nio.ByteBuffer; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; public class StringType implements ColumnType<String> { - private static final Charset UTF_8 = Charset.forName("UTF-8"); - @Override public String parseColumn(ByteBuffer buffer, int length) { byte[] value = new byte[length]; buffer.get(value, 0, length); - return new String(value, UTF_8); - } - - @Override - public String getDefault() - { - return ""; + return new String(value, StandardCharsets.UTF_8); } @Override public ByteBuffer serialize(String value) { - return ByteBuffer.wrap(value.getBytes(UTF_8)); + return ByteBuffer.wrap(value.getBytes(StandardCharsets.UTF_8)); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringUuidType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringUuidType.java index 9253cd6..9b25ecc 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringUuidType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/StringUuidType.java @@ -30,12 +30,6 @@ public class StringUuidType implements ColumnType<String> return ColumnTypes.UUID.parseColumn(buffer, length).toString(); } - @Override - public String getDefault() - { - return ""; - } - @Override public ByteBuffer serialize(String valueStr) { diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/TimestampType.java similarity index 64% copy from cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java copy to cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/TimestampType.java index 9a39c59..1dface3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/ColumnType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/TimestampType.java @@ -19,33 +19,41 @@ package org.apache.cassandra.spark.common.schema; -import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Date; -public interface ColumnType<T> extends Serializable +/** + * Provides functionality to convert {@link ByteBuffer}s to a {@link Date} column type and to serialize + * {@link Date} types to {@link ByteBuffer}s + */ +public class TimestampType implements ColumnType<Date> { + public static final int TYPE_SIZE = Long.SIZE / Byte.SIZE; + /** * Parses a value of this type from buffer. Value will be parsed from current position of the buffer. After * completion of the function, position will be moved by "length" bytes. * * @param buffer Buffer to parse column from * @param length Serialized value size in buffer is as big as length - * @return value as Java type - */ - T parseColumn(ByteBuffer buffer, int length); - - /** - * Default value, in case column value doesn't exist for CQL row - * - * @return the default value for the column type + * @return value as {@link Date} type */ - T getDefault(); + @Override + public Date parseColumn(ByteBuffer buffer, int length) + { + assert length == TYPE_SIZE; + return new Date(buffer.getLong()); + } /** - * Serialize into ByteBuffer and keeps the position at beginning of ByteBuffer + * Serialize {@link Date} into ByteBuffer and keeps the position at beginning of ByteBuffer * * @param value the value to serialize * @return A ByteBuffer containing the serialized value */ - ByteBuffer serialize(T value); + @Override + public ByteBuffer serialize(Date value) + { + return ByteBuffer.allocate(TYPE_SIZE).putLong(0, value.getTime()); + } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/UuidType.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/UuidType.java index f6b4b82..daf27b3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/UuidType.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/common/schema/UuidType.java @@ -42,12 +42,6 @@ public class UuidType implements ColumnType<UUID> return parseUUID(buffer, length); } - @Override - public UUID getDefault() - { - return UUID.randomUUID(); - } - @Override public ByteBuffer serialize(UUID value) { diff --git a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java index a339be2..09c5e47 100644 --- a/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java +++ b/cassandra-analytics-integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java @@ -103,7 +103,6 @@ public class CassandraCluster<I extends IInstance> implements IClusterExtension< .withConfig(config -> configuration.features.forEach(config::with)) .withTokenSupplier(tokenSupplier); - if (dcCount > 1) { clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount, diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyIntegrationTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyIntegrationTest.java new file mode 100644 index 0000000..553d536 --- /dev/null +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/PartitionKeyIntegrationTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.analytics; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Date; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import com.datastax.driver.core.utils.UUIDs; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICoordinator; +import org.apache.cassandra.sidecar.testing.QualifiedName; +import org.apache.cassandra.spark.utils.ByteBufferUtils; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import static org.apache.cassandra.analytics.SparkTestUtils.VALIDATION_DEFAULT_COLUMNS_MAPPER; +import static org.apache.cassandra.analytics.SparkTestUtils.VALIDATION_DEFAULT_ROW_MAPPER; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.ASCII; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BIGINT; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BLOB; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.BOOLEAN; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DOUBLE; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TEXT; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TIMESTAMP; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.TIMEUUID; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.UUID; +import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.VARCHAR; +import static org.apache.cassandra.testing.TestUtils.DC1_RF1; +import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration test that performs bulk reads and writes using different types for the partition key to test + * type mapping for the partition key + */ +class PartitionKeyIntegrationTest extends SharedClusterSparkIntegrationTestBase +{ + static final List<String> DATASET = Arrays.asList("a", "b", "c", "d", "e", "f", "g"); + static final List<String> REDUCED_DATASET_FOR_BOOLEAN = Arrays.asList("a", "b"); + static final String CREATE_TEST_TABLE_SINGLE_PK_STATEMENT = + "CREATE TABLE IF NOT EXISTS %s (id %s, course text, marks int, PRIMARY KEY (id)) WITH read_repair='NONE';"; + static final String CREATE_TEST_TABLE_COMPOSITE_PK_STATEMENT = + "CREATE TABLE IF NOT EXISTS %s (id %s, course %s, marks int, PRIMARY KEY ((id,course))) WITH read_repair='NONE';"; + + // Reference timestamp + static final long CURRENT_TIMESTAMP = System.currentTimeMillis(); + + @ParameterizedTest(name = "CQL Type for Partition Key={0}") + @MethodSource("mappings") + void allTypesInSinglePartitionKey(String typeName, + Function<Integer, String> ignored, + Function<Object[], String> columnsMapper, + Function<Row, String> rowMapper) + { + QualifiedName sourceTableName = new QualifiedName(TEST_KEYSPACE, typeName + "_source"); + QualifiedName targetTableName = new QualifiedName(TEST_KEYSPACE, typeName + "_target"); + + Dataset<Row> data = bulkReaderDataFrame(sourceTableName).load(); + if (typeName.equals(BOOLEAN)) + { + assertThat(data.count()).isEqualTo(REDUCED_DATASET_FOR_BOOLEAN.size()); + } + else + { + assertThat(data.count()).isEqualTo(DATASET.size()); + } + List<Row> rowList = data.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getInt(2))) + .collect(Collectors.toList()); + + bulkWriterDataFrameWriter(data, targetTableName).save(); + sparkTestUtils.validateWrites(rowList, queryAllData(targetTableName), columnsMapper, rowMapper); + } + + @ParameterizedTest(name = "CQL Type for Composite Partition Key={0}-{0}") + @MethodSource("mappings") + void allTypesInCompositePartitionKey(String typeName, + Function<Integer, String> ignored, + Function<Object[], String> columnsMapper, + Function<Row, String> rowMapper) + { + QualifiedName compositeSourceTableName = new QualifiedName(TEST_KEYSPACE, typeName + "_composite_source"); + QualifiedName compositeTargetTableName = new QualifiedName(TEST_KEYSPACE, typeName + "_composite_target"); + + Dataset<Row> data = bulkReaderDataFrame(compositeSourceTableName).load(); + if (typeName.equals(BOOLEAN)) + { + assertThat(data.count()).isEqualTo(REDUCED_DATASET_FOR_BOOLEAN.size()); + } + else + { + assertThat(data.count()).isEqualTo(DATASET.size()); + } + List<Row> rowList = data.collectAsList().stream() + .sorted(Comparator.comparing(row -> row.getInt(2))) + .collect(Collectors.toList()); + + bulkWriterDataFrameWriter(data, compositeTargetTableName).save(); + sparkTestUtils.validateWrites(rowList, queryAllData(compositeTargetTableName), columnsMapper, rowMapper); + } + + @SuppressWarnings("unchecked") + @Override + protected void initializeSchemaForTest() + { + createTestKeyspace(TEST_KEYSPACE, DC1_RF1); + mappings().forEach(arguments -> { + Object[] args = arguments.get(); + Object typeName = args[0]; + QualifiedName sourceTableName = new QualifiedName(TEST_KEYSPACE, typeName + "_source"); + QualifiedName targetTableName = new QualifiedName(TEST_KEYSPACE, typeName + "_target"); + createTestTable(sourceTableName, String.format(CREATE_TEST_TABLE_SINGLE_PK_STATEMENT, "%s", typeName)); + createTestTable(targetTableName, String.format(CREATE_TEST_TABLE_SINGLE_PK_STATEMENT, "%s", typeName)); + + QualifiedName compositeSourceTableName = new QualifiedName(TEST_KEYSPACE, typeName + "_composite_source"); + QualifiedName compositeTargetTableName = new QualifiedName(TEST_KEYSPACE, typeName + "_composite_target"); + + createTestTable(compositeSourceTableName, String.format(CREATE_TEST_TABLE_COMPOSITE_PK_STATEMENT, "%s", typeName, typeName)); + createTestTable(compositeTargetTableName, String.format(CREATE_TEST_TABLE_COMPOSITE_PK_STATEMENT, "%s", typeName, typeName)); + + Function<Integer, String> typeToRowValueFn = (Function<Integer, String>) args[1]; + if (typeName.equals(BOOLEAN)) + { + populateTable(sourceTableName, REDUCED_DATASET_FOR_BOOLEAN, typeToRowValueFn); + populateCompositePKTable(compositeSourceTableName, REDUCED_DATASET_FOR_BOOLEAN, typeToRowValueFn); + } + else + { + populateTable(sourceTableName, DATASET, typeToRowValueFn); + populateCompositePKTable(compositeSourceTableName, DATASET, typeToRowValueFn); + } + }); + } + + void populateTable(QualifiedName tableName, List<String> values, Function<Integer, String> typeToRowValueFn) + { + ICoordinator coordinator = cluster.getFirstRunningInstance().coordinator(); + for (int i = 0; i < values.size(); i++) + { + String value = values.get(i); + String query = String.format("INSERT INTO %s (id, course, marks) VALUES (%s,'%s',%d) ", + tableName, typeToRowValueFn.apply(i), "course_" + value, 80 + i); + coordinator.execute(query, ConsistencyLevel.ALL); + } + } + + void populateCompositePKTable(QualifiedName tableName, List<String> values, Function<Integer, String> typeToRowValueFn) + { + ICoordinator coordinator = cluster.getFirstRunningInstance().coordinator(); + for (int i = 0; i < values.size(); i++) + { + String value = typeToRowValueFn.apply(i); + String query = String.format("INSERT INTO %s (id, course, marks) VALUES (%s,%s,%d) ", + tableName, value, value, 80 + i); + coordinator.execute(query, ConsistencyLevel.ALL); + } + } + + Stream<Arguments> mappings() + { + return Stream.of( + Arguments.of(BIGINT, (Function<Integer, String>) String::valueOf, + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER), + Arguments.of(BLOB, (Function<Integer, String>) value -> String.format("bigintAsBlob(%d)", value), + (Function<Object[], String>) ((Object[] columns) -> { + Object col0 = new BigInteger(ByteBufferUtils.getArray((ByteBuffer) columns[0])).toString(); + if (columns[1] instanceof ByteBuffer) + { + Object col1 = new BigInteger(ByteBufferUtils.getArray((ByteBuffer) columns[1])).toString(); + return String.format("%s:%s:%s", col0, col1, columns[2]); + } + else + { + return String.format("%s:%s:%s", col0, columns[1], columns[2]); + } + }), + (Function<Row, String>) row -> { + byte[] col0 = (byte[]) row.get(0); + Object col1 = row.get(1); + + if (col1 instanceof byte[]) + { + return String.format("%s:%s:%d", new BigInteger(col0), new BigInteger((byte[]) col1), row.getInt(2)); + } + else + { + return String.format("%s:%s:%d", new BigInteger(col0), col1, row.getInt(2)); + } + }), + Arguments.of(DOUBLE, (Function<Integer, String>) String::valueOf, + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER), + Arguments.of(INT, (Function<Integer, String>) String::valueOf, + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER), + Arguments.of(BOOLEAN, (Function<Integer, String>) value -> value % 2 == 0 ? "true" : "false", + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER), + Arguments.of(TEXT, (Function<Integer, String>) value -> String.format("'%d'", value), + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER), + Arguments.of(TIMESTAMP, (Function<Integer, String>) offset -> String.valueOf(CURRENT_TIMESTAMP + offset), + VALIDATION_DEFAULT_COLUMNS_MAPPER, + (Function<Row, String>) row -> { + Date col0 = new Date(row.getTimestamp(0).getTime()); + Object col1 = row.get(1); + if (col1 instanceof Timestamp) + { + return String.format("%s:%s:%d", col0, new Date(((Timestamp) col1).getTime()), row.getInt(2)); + } + return String.format("%s:%s:%d", col0, row.get(1), row.getInt(2)); + }), + Arguments.of(UUID, (Function<Integer, String>) value -> new java.util.UUID(0, value).toString(), + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER), + Arguments.of(VARCHAR, (Function<Integer, String>) value -> String.format("'%d'", value), + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER), + Arguments.of(ASCII, (Function<Integer, String>) value -> String.format("'%d'", value), + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER), + Arguments.of(TIMEUUID, (Function<Integer, String>) offset -> UUIDs.startOf(CURRENT_TIMESTAMP + offset).toString(), + VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER) + ); + } +} diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java index e49ffdb..de14a2b 100644 --- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java +++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -47,6 +48,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; +import org.jetbrains.annotations.NotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.catchThrowable; @@ -56,6 +58,15 @@ import static org.assertj.core.api.Assertions.catchThrowable; */ public class SparkTestUtils { + /** + * Maps a row queried from Cassandra, represented as an object array, and it produces a string representation + * to perform validation of data written by a bulk writer job + */ + public static final Function<Object[], String> VALIDATION_DEFAULT_COLUMNS_MAPPER = columns -> String.format("%s:%s:%s", columns[0], columns[1], columns[2]); + /** + * Maps a {@link Row} to a string representation used to validate data written by a bulk writer job + */ + public static final Function<Row, String> VALIDATION_DEFAULT_ROW_MAPPER = row -> String.format("%s:%s:%d", row.get(0), row.get(1), row.getInt(2)); protected ICluster<? extends IInstance> cluster; protected DnsResolver dnsResolver; protected int sidecarPort; @@ -166,13 +177,18 @@ public class SparkTestUtils } public void validateWrites(List<Row> sourceData, Object[][] queriedData) + { + validateWrites(sourceData, queriedData, VALIDATION_DEFAULT_COLUMNS_MAPPER, VALIDATION_DEFAULT_ROW_MAPPER); + } + + public void validateWrites(List<Row> sourceData, + @NotNull Object[][] queriedData, + @NotNull Function<Object[], String> columnsMapper, + @NotNull Function<Row, String> rowMapper) { // build a set of entries read from Cassandra into a set Set<String> actualEntries = Arrays.stream(queriedData) - .map((Object[] columns) -> String.format("%s:%s:%s", - columns[0], - columns[1], - columns[2])) + .map(columnsMapper) .collect(Collectors.toSet()); // Number of entries in Cassandra must match the original datasource @@ -180,10 +196,7 @@ public class SparkTestUtils // remove from actual entries to make sure that the data read is the same as the data written sourceData.forEach(row -> { - String key = String.format("%d:%s:%d", - row.getInt(0), - row.getString(1), - row.getInt(2)); + String key = rowMapper.apply(row); assertThat(actualEntries.remove(key)).as(key + " is expected to exist in the actual entries") .isTrue(); }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org