This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push: new e0ae9d7 CASSANDRA-19334 Upgrade to Cassandra 4.0.12 and remove BufferMode and BatchSize options e0ae9d7 is described below commit e0ae9d7484e242f6af495aac2cb4d8dc121fba89 Author: Yifan Cai <y...@apache.org> AuthorDate: Wed Jan 24 15:38:41 2024 -0800 CASSANDRA-19334 Upgrade to Cassandra 4.0.12 and remove BufferMode and BatchSize options In cassandra-all:4.0.12, improvements were made for the CQLSSTableWriter. The sorted writer now can produce size-capped SSTables. It replaces the need for the unsorted sstable writer, which has to buffer and sort data on flushing. The dataset to write in the spark application is already sorted. By avoiding using the unsorted writer, it prevents wasting CPU time on sorting the sorted data. Since the sorted sstable writer does not need to buffer data, its size estimation is more accurat [...] By removing the unsorted sstable writer, it no longer requires the RowBufferMode option. By supporting size-capping in sorted writer, it no longer requires the BatchSize option. Patch by Yifan Cai; reviewed by Francisco Guerrero for CASSANDRA-19334 --- CHANGES.txt | 1 + .../cassandra/spark/bulkwriter/BulkSparkConf.java | 53 ++++++++------- .../spark/bulkwriter/CassandraJobInfo.java | 18 +---- .../apache/cassandra/spark/bulkwriter/JobInfo.java | 11 ++- .../cassandra/spark/bulkwriter/RecordWriter.java | 63 +++++++---------- .../cassandra/spark/bulkwriter/SSTableWriter.java | 3 +- .../spark/bulkwriter/SSTableWriterFactory.java | 3 - .../cassandra/spark/bulkwriter/WriterOptions.java | 4 +- .../spark/bulkwriter/BulkSparkConfTest.java | 54 --------------- .../spark/bulkwriter/MockBulkWriterContext.java | 30 +-------- .../spark/bulkwriter/RecordWriterTest.java | 78 ++++++++++------------ .../apache/cassandra/bridge/CassandraBridge.java | 1 - .../org/apache/cassandra/bridge/RowBufferMode.java | 41 ------------ cassandra-four-zero/build.gradle | 2 +- .../bridge/CassandraBridgeImplementation.java | 23 ++++++- .../bridge/SSTableWriterImplementation.java | 27 +++----- .../io/sstable/SSTableTombstoneWriter.java | 11 +-- .../bridge/SSTableWriterImplementationTest.java | 55 ++++++++------- scripts/build-dtest-jars.sh | 4 +- 19 files changed, 162 insertions(+), 320 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 7e60c77..b105092 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.0.0 + * Upgrade to Cassandra 4.0.12 and remove RowBufferMode and BatchSize options (CASSANDRA-19334) * Improve logging for bulk writes and on task failures (CASSANDRA-19331) * Allow setting TTL for snapshots created by Analytics bulk reader (CASSANDRA-19273) * Fix range split and use open-closed range notation consistently (CASSANDRA-19325) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java index 8c97b07..1a16ec3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java @@ -39,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.bridge.CassandraBridgeFactory; -import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.clients.SidecarInstanceImpl; import org.apache.cassandra.sidecar.client.SidecarInstance; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; @@ -83,8 +82,7 @@ public class BulkSparkConf implements Serializable public static final long DEFAULT_SIDECAR_REQUEST_MAX_RETRY_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(60L); public static final int DEFAULT_COMMIT_BATCH_SIZE = 10_000; public static final int DEFAULT_RING_RETRY_COUNT = 3; - public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = RowBufferMode.UNBUFFERED; - public static final int DEFAULT_BATCH_SIZE_IN_ROWS = 1_000_000; + public static final int DEFAULT_SSTABLE_DATA_SIZE_IN_MIB = 160; // NOTE: All Cassandra Analytics setting names must start with "spark" in order to not be ignored by Spark, // and must not start with "spark.cassandra" so as to not conflict with Spark Cassandra Connector @@ -108,9 +106,7 @@ public class BulkSparkConf implements Serializable public final ConsistencyLevel.CL consistencyLevel; public final String localDC; public final Integer numberSplits; - public final RowBufferMode rowBufferMode; - public final Integer sstableDataSizeInMB; - public final Integer sstableBatchSize; + public final Integer sstableDataSizeInMiB; public final int commitBatchSize; public final boolean skipExtendedVerify; public final WriteMode writeMode; @@ -147,9 +143,7 @@ public class BulkSparkConf implements Serializable this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM")); this.localDC = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null); this.numberSplits = MapUtils.getInt(options, WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits"); - this.rowBufferMode = MapUtils.getEnumOption(options, WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row buffering mode"); - this.sstableDataSizeInMB = MapUtils.getInt(options, WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), 160, "sstable data size in MB"); - this.sstableBatchSize = MapUtils.getInt(options, WriterOptions.BATCH_SIZE.name(), 1_000_000, "sstable batch size"); + this.sstableDataSizeInMiB = resolveSSTableDataSizeInMiB(options); this.commitBatchSize = MapUtils.getInt(options, WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit batch size"); this.commitThreadsPerInstance = MapUtils.getInt(options, WriterOptions.COMMIT_THREADS_PER_INSTANCE.name(), 2, "commit threads per instance"); this.keystorePassword = MapUtils.getOrDefault(options, WriterOptions.KEYSTORE_PASSWORD.name(), null); @@ -179,6 +173,29 @@ public class BulkSparkConf implements Serializable .collect(Collectors.toSet()); } + protected int resolveSSTableDataSizeInMiB(Map<String, String> options) + { + int legacyOptionValue = -1; + if (options.containsKey(WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name())) + { + LOGGER.warn("The writer option: SSTABLE_DATA_SIZE_IN_MB is deprecated. " + + "Please use SSTABLE_DATA_SIZE_IN_MIB instead. See option description for details."); + legacyOptionValue = MapUtils.getInt(options, WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), + DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable data size in mebibytes"); + } + + if (options.containsKey(WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name())) + { + LOGGER.info("The writer option: SSTABLE_DATA_SIZE_IN_MIB is defined. " + + "Favor the value over SSTABLE_DATA_SIZE_IN_MB"); + return MapUtils.getInt(options, WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name(), + DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable data size in mebibytes"); + + } + + return legacyOptionValue == -1 ? DEFAULT_SSTABLE_DATA_SIZE_IN_MIB : legacyOptionValue; + } + protected Set<? extends SidecarInstance> buildSidecarInstances(Map<String, String> options, int sidecarPort) { String sidecarInstances = MapUtils.getOrThrow(options, WriterOptions.SIDECAR_INSTANCES.name(), "sidecar_instances"); @@ -193,27 +210,9 @@ public class BulkSparkConf implements Serializable Preconditions.checkNotNull(table); Preconditions.checkArgument(getHttpResponseTimeoutMs() > 0, HTTP_RESPONSE_TIMEOUT + " must be > 0"); validateSslConfiguration(); - validateTableWriterSettings(); CassandraBridgeFactory.validateBridges(); } - protected void validateTableWriterSettings() - { - boolean batchSizeIsZero = sstableBatchSize == 0; - - if (rowBufferMode == RowBufferMode.UNBUFFERED) - { - Preconditions.checkArgument(!batchSizeIsZero, - "If writing in sorted order (ROW_BUFFER_MODE is UNBUFFERED) then BATCH_SIZE " - + "should be non zero, but it was set to 0 in writer options"); - } - else if (!batchSizeIsZero && sstableBatchSize != DEFAULT_BATCH_SIZE_IN_ROWS) - { - LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value ({}) but ROW_BUFFER_MODE is set to BUFFERED." - + " Ignoring BATCH_SIZE.", sstableBatchSize); - } - } - /** * Validates the SSL configuration present and throws an exception if it is incorrect * diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java index e4d4bbe..b385289 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java @@ -21,7 +21,6 @@ package org.apache.cassandra.spark.bulkwriter; import java.util.UUID; -import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.jetbrains.annotations.NotNull; @@ -51,23 +50,10 @@ public class CassandraJobInfo implements JobInfo return conf.localDC; } - @NotNull - @Override - public RowBufferMode getRowBufferMode() - { - return conf.rowBufferMode; - } - - @Override - public int getSstableDataSizeInMB() - { - return conf.sstableDataSizeInMB; - } - @Override - public int getSstableBatchSize() + public int sstableDataSizeInMiB() { - return conf.sstableBatchSize; + return conf.sstableDataSizeInMiB; } @Override diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java index 69efb05..91da29a 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java @@ -22,7 +22,6 @@ package org.apache.cassandra.spark.bulkwriter; import java.io.Serializable; import java.util.UUID; -import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.jetbrains.annotations.NotNull; @@ -33,12 +32,10 @@ public interface JobInfo extends Serializable String getLocalDC(); - @NotNull - RowBufferMode getRowBufferMode(); - - int getSstableDataSizeInMB(); - - int getSstableBatchSize(); + /** + * @return the max sstable data file size in mebibytes + */ + int sstableDataSizeInMiB(); int getCommitBatchSize(); diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java index cc9bef5..908ecba 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java @@ -49,7 +49,6 @@ import com.google.common.collect.Range; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.sidecar.common.data.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; @@ -63,16 +62,16 @@ import static org.apache.cassandra.spark.utils.ScalaConversionUtils.asScalaItera public class RecordWriter implements Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(RecordWriter.class); + private final BulkWriterContext writerContext; private final String[] columnNames; - private Supplier<TaskContext> taskContextSupplier; private final BiFunction<BulkWriterContext, Path, SSTableWriter> tableWriterSupplier; - private SSTableWriter sstableWriter = null; - private final BulkWriteValidator writeValidator; private final ReplicaAwareFailureHandler<RingInstance> failureHandler; - private int batchNumber = 0; - private int batchSize = 0; + + private Supplier<TaskContext> taskContextSupplier; + private SSTableWriter sstableWriter = null; + private int outputSequence = 0; // sub-folder for possible subrange splits public RecordWriter(BulkWriterContext writerContext, String[] columnNames) { @@ -179,13 +178,12 @@ public class RecordWriter implements Serializable streamSession = maybeCreateStreamSession(taskContext, streamSession, currentRange, failureHandler, results); maybeCreateTableWriter(partitionId, baseDir); writeRow(rowData, valueMap, partitionId, streamSession.getTokenRange()); - checkBatchSize(streamSession, partitionId, job); } // Finalize SSTable for the last StreamSession if (sstableWriter != null) { - finalizeSSTable(streamSession, partitionId, sstableWriter, batchNumber, batchSize); + finalizeSSTable(streamSession, partitionId); results.add(streamSession.close()); } LOGGER.info("[{}] Done with all writers and waiting for stream to complete", partitionId); @@ -256,13 +254,8 @@ public class RecordWriter implements Serializable // Schedule data to be sent if we are processing a batch that has not been scheduled yet. if (streamSession != null) { - // Complete existing batched writes (if any) before the existing stream session is closed - if (batchSize != 0) - { - finalizeSSTable(streamSession, taskContext.partitionId(), sstableWriter, batchNumber, batchSize); - sstableWriter = null; - batchSize = 0; - } + // Complete existing writes (if any) before the existing stream session is closed + finalizeSSTable(streamSession, taskContext.partitionId()); results.add(streamSession.close()); } streamSession = new StreamSession(writerContext, getStreamId(taskContext), matchingSubRange, failureHandler); @@ -360,32 +353,15 @@ public class RecordWriter implements Serializable } } - /** - * Stream to replicas; if batchSize is reached, "finalize" SST to "schedule" streamSession - */ - private void checkBatchSize(StreamSession streamSession, int partitionId, JobInfo job) throws IOException - { - if (job.getRowBufferMode() == RowBufferMode.UNBUFFERED) - { - batchSize++; - if (batchSize >= job.getSstableBatchSize()) - { - finalizeSSTable(streamSession, partitionId, sstableWriter, batchNumber, batchSize); - sstableWriter = null; - batchSize = 0; - } - } - } - private void maybeCreateTableWriter(int partitionId, Path baseDir) throws IOException { if (sstableWriter == null) { - Path outDir = Paths.get(baseDir.toString(), Integer.toString(++batchNumber)); + Path outDir = Paths.get(baseDir.toString(), Integer.toString(outputSequence++)); Files.createDirectories(outDir); sstableWriter = tableWriterSupplier.apply(writerContext, outDir); - LOGGER.info("[{}][{}] Created new SSTable writer", partitionId, batchNumber); + LOGGER.info("[{}] Created new SSTable writer", partitionId); } } @@ -399,16 +375,23 @@ public class RecordWriter implements Serializable return map; } + /** + * Close the {@link RecordWriter#sstableWriter} if present. Schedule a stream session with the produced sstables. + * And finally, nullify {@link RecordWriter#sstableWriter} + */ private void finalizeSSTable(StreamSession streamSession, - int partitionId, - SSTableWriter sstableWriter, - int batchNumber, - int batchSize) throws IOException + int partitionId) throws IOException { - LOGGER.info("[{}][{}] Closing writer and scheduling SStable stream with {} rows", - partitionId, batchNumber, batchSize); + if (sstableWriter == null) + { + LOGGER.warn("SSTableWriter is null. Nothing to finalize"); + return; + } + LOGGER.info("[{}] Closing writer and scheduling SStable stream", + partitionId); sstableWriter.close(writerContext, partitionId); streamSession.scheduleStream(sstableWriter); + sstableWriter = null; } private StreamSession createStreamSession(TaskContext taskContext) diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java index bc06e0f..18021f2 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java @@ -81,8 +81,7 @@ public class SSTableWriter writerContext.cluster().getPartitioner().toString(), tableSchema.createStatement, tableSchema.modificationStatement, - writerContext.job().getRowBufferMode(), - writerContext.job().getSstableDataSizeInMB()); + writerContext.job().sstableDataSizeInMiB()); } @NotNull diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java index 50c5d47..55cace3 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java @@ -22,7 +22,6 @@ package org.apache.cassandra.spark.bulkwriter; import org.apache.cassandra.bridge.CassandraBridge; import org.apache.cassandra.bridge.CassandraBridgeFactory; import org.apache.cassandra.bridge.CassandraVersionFeatures; -import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.bridge.SSTableWriter; public final class SSTableWriterFactory @@ -37,7 +36,6 @@ public final class SSTableWriterFactory String partitioner, String createStatement, String insertStatement, - RowBufferMode rowBufferMode, int bufferSizeMB) { CassandraBridge cassandraBridge = CassandraBridgeFactory.get(serverVersion); @@ -45,7 +43,6 @@ public final class SSTableWriterFactory partitioner, createStatement, insertStatement, - rowBufferMode, bufferSizeMB); } } diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java index 075672f..3dd68dd 100644 --- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java +++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java @@ -27,7 +27,6 @@ public enum WriterOptions implements WriterOption BULK_WRITER_CL, LOCAL_DC, NUMBER_SPLITS, - BATCH_SIZE, COMMIT_THREADS_PER_INSTANCE, COMMIT_BATCH_SIZE, SKIP_EXTENDED_VERIFY, @@ -41,8 +40,9 @@ public enum WriterOptions implements WriterOption TRUSTSTORE_PATH, TRUSTSTORE_BASE64_ENCODED, SIDECAR_PORT, - ROW_BUFFER_MODE, + @Deprecated // the size unit `MB` is incorrect, use `SSTABLE_DATA_SIZE_IN_MIB` instead SSTABLE_DATA_SIZE_IN_MB, + SSTABLE_DATA_SIZE_IN_MIB, TTL, TIMESTAMP, /** diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java index 56caa5b..b55045d 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java @@ -28,7 +28,6 @@ import com.google.common.collect.Maps; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator; import org.apache.cassandra.spark.utils.BuildInfo; import org.apache.spark.SparkConf; @@ -215,59 +214,6 @@ public class BulkSparkConfTest + "Please provide option " + WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage()); } - @Test - public void testUnbufferedRowBufferMode() - { - Map<String, String> options = copyDefaultOptions(); - options.put(WriterOptions.ROW_BUFFER_MODE.name(), "UNBUFFERED"); - BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); - assertNotNull(bulkSparkConf); - assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.UNBUFFERED); - } - - @Test - public void testBufferedRowBufferMode() - { - Map<String, String> options = copyDefaultOptions(); - options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); - BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); - assertNotNull(bulkSparkConf); - assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); - } - - @Test - public void testInvalidRowBufferMode() - { - Map<String, String> options = copyDefaultOptions(); - options.put(WriterOptions.ROW_BUFFER_MODE.name(), "invalid"); - IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, - () -> new BulkSparkConf(sparkConf, options)); - assertEquals("Key row buffering mode with value invalid is not a valid Enum of type class org.apache.cassandra.bridge.RowBufferMode.", - exception.getMessage()); - } - - @Test - public void testBufferedRowBufferModeWithZeroBatchSize() - { - Map<String, String> options = copyDefaultOptions(); - options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); - options.put(WriterOptions.BATCH_SIZE.name(), "0"); - BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); - assertNotNull(bulkSparkConf); - assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); - } - - @Test - public void testNonZeroBatchSizeIsIgnoredWithBufferedRowBufferMode() - { - Map<String, String> options = copyDefaultOptions(); - options.put(WriterOptions.BATCH_SIZE.name(), "5"); - options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED"); - BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options); - assertNotNull(bulkSparkConf); - assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED); - } - @Test void testQuoteIdentifiers() { diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java index 1f589da..e864dac 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java @@ -40,7 +40,6 @@ import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.tuple.Pair; import org.apache.cassandra.bridge.CassandraBridgeFactory; -import org.apache.cassandra.bridge.RowBufferMode; import org.apache.cassandra.sidecar.common.data.TimeSkewResponse; import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel; import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping; @@ -56,7 +55,6 @@ import org.apache.cassandra.spark.data.partitioner.Partitioner; import org.apache.cassandra.spark.validation.StartupValidator; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; -import org.jetbrains.annotations.NotNull; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE; import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT; @@ -74,7 +72,6 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo new org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType, DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType}, new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), mockCqlType(VARCHAR), mockCqlType(INT)}); private final boolean quoteIdentifiers; - private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED; private ConsistencyLevel.CL consistencyLevel; private int sstableDataSizeInMB = 128; private int sstableWriteBatchSize = 2; @@ -219,20 +216,8 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo return "DC1"; } - @NotNull @Override - public RowBufferMode getRowBufferMode() - { - return rowBufferMode; - } - - public void setRowBufferMode(RowBufferMode rowBufferMode) - { - this.rowBufferMode = rowBufferMode; - } - - @Override - public int getSstableDataSizeInMB() + public int sstableDataSizeInMiB() { return sstableDataSizeInMB; } @@ -243,19 +228,6 @@ public class MockBulkWriterContext implements BulkWriterContext, ClusterInfo, Jo this.sstableDataSizeInMB = sstableDataSizeInMB; } - @Override - public int getSstableBatchSize() - { - return sstableWriteBatchSize; - } - - @VisibleForTesting - void setSstableWriteBatchSize(int sstableWriteBatchSize) - { - this.sstableWriteBatchSize = sstableWriteBatchSize; - } - - @Override public int getCommitBatchSize() { return 1; diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java index b2fa210..92fda4c 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java @@ -76,10 +76,9 @@ public class RecordWriterTest { private static final int REPLICA_COUNT = 3; private static final int FILES_PER_SSTABLE = 8; - // writing 5 rows with batch size of 2 should produce 3 sstable - private static final int UPLOADED_SSTABLES = 3; - private static final int ROWS_COUNT = 5; - private static final int BATCH_SIZE = 2; + // writing 270 rows with sstable size cap of 1 MB should produce 2 sstable + private static final int UPLOADED_SSTABLES = 2; + private static final int ROWS_COUNT = 270; private static final String[] COLUMN_NAMES = { "id", "date", "course", "marks" }; @@ -101,7 +100,7 @@ public class RecordWriterTest tw = new MockTableWriter(folder.getRoot()); tokenRangeMapping = TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 3), 12); writerContext = new MockBulkWriterContext(tokenRangeMapping); - writerContext.setSstableWriteBatchSize(BATCH_SIZE); // create a new sstable after writing 2 rows + writerContext.setSstableDataSizeInMB(1); // defaults to the minimum sstable data size allowed to set tc = new TestTaskContext(); range = writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId()); tokenizer = new Tokenizer(writerContext); @@ -138,7 +137,7 @@ public class RecordWriterTest rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new); when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); assertThat(ex.getMessage(), containsString("Token range mappings have changed since the task started")); } @@ -168,7 +167,7 @@ public class RecordWriterTest when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping); when(m.getInstanceAvailability()).thenReturn(availability); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); rw.write(data); Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); // Should not upload to blocked instances @@ -189,7 +188,7 @@ public class RecordWriterTest when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping); when(m.getInstanceAvailability()).thenCallRealMethod(); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); rw.write(data); Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas @@ -198,7 +197,7 @@ public class RecordWriterTest @Test public void testSuccessfulWrite() throws InterruptedException { - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); } @@ -221,7 +220,8 @@ public class RecordWriterTest pk, pk, quoteIdentifiers); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + writerContext.setSstableDataSizeInMB(1); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); validateSuccessfulWrite(writerContext, data, columnNames); } @@ -229,7 +229,7 @@ public class RecordWriterTest public void testSuccessfulWriteCheckUploads() { rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, SSTableWriter::new); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); rw.write(data); Map<CassandraInstance, List<UploadRequest>> uploads = writerContext.getUploads(); assertThat(uploads.keySet().size(), is(REPLICA_COUNT)); // Should upload to 3 replicas @@ -244,53 +244,48 @@ public class RecordWriterTest @Test public void testWriteWithConstantTTL() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, false, false); - validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, false); + validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); } @Test public void testWriteWithTTLColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, true, false); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, false); String[] columnNamesWithTtl = { "id", "date", "course", "marks", "ttl" }; - validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl); + validateSuccessfulWrite(writerContext, data, columnNamesWithTtl); } @Test public void testWriteWithConstantTimestamp() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, false, false); - validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, false); + validateSuccessfulWrite(writerContext, data, COLUMN_NAMES); } @Test public void testWriteWithTimestampColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, false, true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, true); String[] columnNamesWithTimestamp = { "id", "date", "course", "marks", "timestamp" }; - validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTimestamp); + validateSuccessfulWrite(writerContext, data, columnNamesWithTimestamp); } @Test public void testWriteWithTimestampAndTTLColumn() throws InterruptedException { - MockBulkWriterContext bulkWriterContext = new MockBulkWriterContext(tokenRangeMapping); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, true, true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, true); String[] columnNames = { "id", "date", "course", "marks", "ttl", "timestamp" }; - validateSuccessfulWrite(bulkWriterContext, data, columnNames); + validateSuccessfulWrite(writerContext, data, columnNames); } @Test @@ -390,7 +385,7 @@ public class RecordWriterTest public void testCorruptSSTable() { rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw.setOutDir(path), path)); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); // TODO: Add better error handling with human-readable exception messages in SSTableReader::new // That way we can assert on the exception thrown here RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); @@ -399,9 +394,8 @@ public class RecordWriterTest @Test public void testWriteWithOutOfRangeTokenFails() { - writerContext.setSstableWriteBatchSize(ROWS_COUNT + 100); // Write all rows in the same sstable rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder)); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, Range.all(), false, false, false); RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); String expectedErr = "java.lang.IllegalStateException: Received Token " + "5765203080415074583 outside the expected ranges [(-9223372036854775808‥100000]]"; @@ -413,7 +407,7 @@ public class RecordWriterTest { rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder)); tw.setAddRowThrows(true); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); assertEquals("java.lang.RuntimeException: Failed to write because addRow throws", ex.getMessage()); } @@ -425,7 +419,7 @@ public class RecordWriterTest long sixtyOneMinutesInMillis = TimeUnit.MINUTES.toMillis(61); rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, path) -> new SSTableWriter(tw, folder)); writerContext.setTimeProvider(() -> System.currentTimeMillis() - sixtyOneMinutesInMillis); - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); RuntimeException ex = assertThrows(RuntimeException.class, () -> rw.write(data)); assertThat(ex.getMessage(), startsWith("Time skew between Spark and Cassandra is too large. Allowable skew is 60 minutes. Spark executor time is ")); } @@ -438,7 +432,7 @@ public class RecordWriterTest long remoteTime = System.currentTimeMillis() - fiftyNineMinutesInMillis; rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, SSTableWriter::new); writerContext.setTimeProvider(() -> remoteTime); // Return a very low "current time" to make sure we fail if skew is too bad - Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true); + Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(); rw.write(data); } @@ -473,19 +467,16 @@ public class RecordWriterTest } } - private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(boolean onlyInRange) + private Iterator<Tuple2<DecoratedKey, Object[]>> generateData() { - return generateData(onlyInRange, false, false); + return generateData(false, false); } - private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(boolean onlyInRange, boolean withTTL, boolean withTimestamp) + private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(boolean withTTL, boolean withTimestamp) { - return onlyInRange - ? generateData(ROWS_COUNT, range, false, withTTL, withTimestamp) // accept only tokens in range - : generateData(ROWS_COUNT, Range.all(), false, withTTL, withTimestamp); // accept all tokens + return generateData(ROWS_COUNT, range, false, withTTL, withTimestamp); } - // generate data with fake tokens assigend. The fake tokens are provided by the input range. // Although the data generated have fake tokens, the actual tokens computed from each tuple // are still ordered ascendingly. @@ -508,34 +499,35 @@ public class RecordWriterTest boolean fakeTokens, boolean withTTL, boolean withTimestamp) { + String courseString = IntStream.range(0, 100000).boxed().map(i -> "Long long string").collect(Collectors.joining()); Stream<Tuple2<DecoratedKey, Object[]>> source = IntStream.iterate(0, integer -> integer + 1).mapToObj(index -> { Object[] columns; if (withTTL && withTimestamp) { columns = new Object[] { - index, index, "foo" + index, index, index * 100, System.currentTimeMillis() * 1000 + index, index, courseString, index, index * 100, System.currentTimeMillis() * 1000 }; } else if (withTimestamp) { columns = new Object[] { - index, index, "foo" + index, index, System.currentTimeMillis() * 1000 + index, index, courseString, index, System.currentTimeMillis() * 1000 }; } else if (withTTL) { columns = new Object[] { - index, index, "foo" + index, index, index * 100 + index, index, courseString, index, index * 100 }; } else { columns = new Object[] { - index, index, "foo" + index, index + index, index, courseString, index }; } return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns); diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index cfc7057..6c0240c 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -390,7 +390,6 @@ public abstract class CassandraBridge String partitioner, String createStatement, String insertStatement, - RowBufferMode rowBufferMode, int bufferSizeMB); public interface IRow diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java deleted file mode 100644 index 70be08c..0000000 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.cassandra.bridge; - -/** - * Configures how data is flushed to an SSTable - */ -public enum RowBufferMode -{ - /** - * In this mode, Cassandra will flush an SSTable to disk once it reaches the configured BufferSizeInMB. - * This parameter is configured by the user-configurable SSTABLE_DATA_SIZE_IN_MB WriterOption. Note: - * This is the uncompressed size of data before being written to disk, and the actual size of an SSTable - * can be smaller based on the compression configuration for the SSTable and how compressible the data is. - */ - BUFFERED, - - /** - * Cassandra expects rows in sorted order and will not flush an SSTable automatically. The size of an - * SSTable is based on the number of rows we write to the SSTable. This parameter is configured by the - * user-configurable BATCH_SIZE WriterOption. - */ - UNBUFFERED -} diff --git a/cassandra-four-zero/build.gradle b/cassandra-four-zero/build.gradle index b93e8fb..9b90451 100644 --- a/cassandra-four-zero/build.gradle +++ b/cassandra-four-zero/build.gradle @@ -29,7 +29,7 @@ project(':cassandra-four-zero') { compileOnly(group: "${sparkGroupId}", name: "spark-core_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") compileOnly(group: "${sparkGroupId}", name: "spark-sql_${scalaMajorVersion}", version: "${project.rootProject.sparkVersion}") - implementation(group: 'org.apache.cassandra', name: 'cassandra-all', version: '4.0.2') { + implementation(group: 'org.apache.cassandra', name: 'cassandra-all', version: '4.0.12') { // Exclude JNA libraries from the cassandra-all dependency tree because Spark has its own version // and trying to load two different versions causes issues with the native libraries exclude(group: 'net.java.dev.jna') diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java index 68a896c..f1274ad 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java @@ -51,8 +51,10 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.commitlog.CommitLogSegmentManagerStandard; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.ByteBufferAccessor; import org.apache.cassandra.db.marshal.CompositeType; @@ -164,6 +166,7 @@ public class CassandraBridgeImplementation extends CassandraBridge throw new RuntimeException(exception); } config.data_file_directories = new String[]{tempDirectory.toString()}; + setupCommitLogConfigs(tempDirectory); DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch()); Keyspace.setInitialized(); @@ -171,6 +174,23 @@ public class CassandraBridgeImplementation extends CassandraBridge } } + public static void setupCommitLogConfigs(Path path) + { + String commitLogPath = path + "/commitlog"; + DatabaseDescriptor.getRawConfig().commitlog_directory = commitLogPath; + DatabaseDescriptor.getRawConfig().hints_directory = path + "/hints"; + DatabaseDescriptor.getRawConfig().saved_caches_directory = path + "/saved_caches"; + DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic); + DatabaseDescriptor.setCommitLogCompression(new ParameterizedClass("LZ4Compressor", + Collections.emptyMap())); + DatabaseDescriptor.setCommitLogSyncPeriod(30); + DatabaseDescriptor.setCommitLogMaxCompressionBuffersPerPool(3); + DatabaseDescriptor.setCommitLogSyncGroupWindow(30); + DatabaseDescriptor.setCommitLogSegmentSize(32); + DatabaseDescriptor.getRawConfig().commitlog_total_space_in_mb = 1024; + DatabaseDescriptor.setCommitLogSegmentMgrProvider(commitLog -> new CommitLogSegmentManagerStandard(commitLog, commitLogPath)); + } + public CassandraBridgeImplementation() { // Cassandra-version-specific Kryo serializers @@ -581,10 +601,9 @@ public class CassandraBridgeImplementation extends CassandraBridge String partitioner, String createStatement, String insertStatement, - RowBufferMode rowBufferMode, int bufferSizeMB) { - return new SSTableWriterImplementation(inDirectory, partitioner, createStatement, insertStatement, rowBufferMode, bufferSizeMB); + return new SSTableWriterImplementation(inDirectory, partitioner, createStatement, insertStatement, bufferSizeMB); } // Version-Specific Test Utility Methods diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java index facfad6..89e14f1 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java @@ -44,7 +44,6 @@ public class SSTableWriterImplementation implements SSTableWriter String partitioner, String createStatement, String insertStatement, - RowBufferMode rowBufferMode, int bufferSizeMB) { IPartitioner cassPartitioner = partitioner.toLowerCase().contains("random") ? new RandomPartitioner() @@ -53,7 +52,6 @@ public class SSTableWriterImplementation implements SSTableWriter CQLSSTableWriter.Builder builder = configureBuilder(inDirectory, createStatement, insertStatement, - rowBufferMode, bufferSizeMB, cassPartitioner); // TODO: Remove me once CQLSSTableWriter.Builder synchronize on schema (see CASSANDRA-TBD) @@ -84,23 +82,18 @@ public class SSTableWriterImplementation implements SSTableWriter static CQLSSTableWriter.Builder configureBuilder(String inDirectory, String createStatement, String insertStatement, - RowBufferMode rowBufferMode, int bufferSizeMB, IPartitioner cassPartitioner) { - CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder() - .inDirectory(inDirectory) - .forTable(createStatement) - .withPartitioner(cassPartitioner) - .using(insertStatement); - if (rowBufferMode == RowBufferMode.UNBUFFERED) - { - builder.sorted(); - } - else if (rowBufferMode == RowBufferMode.BUFFERED) - { - builder.withBufferSizeInMB(bufferSizeMB); - } - return builder; + return CQLSSTableWriter + .builder() + .inDirectory(inDirectory) + .forTable(createStatement) + .withPartitioner(cassPartitioner) + .using(insertStatement) + // The data frame to write is always sorted, + // see org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation.insert + .sorted() + .withMaxSSTableSizeInMiB(bufferSizeMB); } } diff --git a/cassandra-four-zero/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java b/cassandra-four-zero/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java index aa39324..32ff705 100644 --- a/cassandra-four-zero/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java +++ b/cassandra-four-zero/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java @@ -32,6 +32,8 @@ import java.util.SortedSet; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.bridge.CassandraSchema; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.ColumnSpecification; @@ -70,7 +72,9 @@ import org.apache.cassandra.utils.ByteBufferUtil; /** * Re-write of CQLSSTableWriter for writing tombstones to an SSTable for testing + * Used for testing purpose only */ +@VisibleForTesting public final class SSTableTombstoneWriter implements Closeable { private static final ByteBuffer UNSET_VALUE = ByteBufferUtil.UNSET_BYTE_BUFFER; @@ -287,7 +291,6 @@ public final class SSTableTombstoneWriter implements Closeable private ModificationStatement.Parsed deleteStatement; private IPartitioner partitioner; - private boolean sorted = false; private long bufferSizeInMB = 128; Builder() @@ -440,9 +443,9 @@ public final class SSTableTombstoneWriter implements Closeable DeleteStatement preparedDelete = prepareDelete(); TableMetadataRef ref = TableMetadataRef.forOfflineTools(tableMetadata); - AbstractSSTableSimpleWriter writer = sorted - ? new SSTableSimpleWriter(directory, ref, preparedDelete.updatedColumns()) - : new SSTableSimpleUnsortedWriter(directory, ref, preparedDelete.updatedColumns(), bufferSizeInMB); + AbstractSSTableSimpleWriter writer = new SSTableSimpleUnsortedWriter(directory, ref, + preparedDelete.updatedColumns(), + bufferSizeInMB); if (formatType != null) { diff --git a/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java index 6ede1d0..e5e5846 100644 --- a/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java +++ b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java @@ -21,6 +21,7 @@ package org.apache.cassandra.bridge; import java.io.File; import java.lang.reflect.Field; +import java.util.Arrays; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -30,8 +31,6 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter; import org.apache.cassandra.utils.ReflectionUtils; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -46,32 +45,16 @@ class SSTableWriterImplementationTest File writeDirectory; @Test - void testUnbufferedRowBufferModeConfiguration() throws NoSuchFieldException, IllegalAccessException + void testSSTableWriterConfiguration() throws NoSuchFieldException, IllegalAccessException { CQLSSTableWriter.Builder builder = SSTableWriterImplementation.configureBuilder(writeDirectory.getAbsolutePath(), CREATE_STATEMENT, INSERT_STATEMENT, - RowBufferMode.UNBUFFERED, 250, new Murmur3Partitioner()); assertTrue(peekSorted(builder)); - assertNotEquals(250, peekBufferSizeInMB(builder)); // 250 should not be set - } - - @Test - void testBufferedRowBufferModeConfiguration() throws NoSuchFieldException, IllegalAccessException - { - CQLSSTableWriter.Builder builder = SSTableWriterImplementation.configureBuilder(writeDirectory.getAbsolutePath(), - CREATE_STATEMENT, - INSERT_STATEMENT, - RowBufferMode.BUFFERED, - 250, - new Murmur3Partitioner()); - - - assertFalse(peekSorted(builder)); assertEquals(250, peekBufferSizeInMB(builder)); } @@ -84,20 +67,34 @@ class SSTableWriterImplementationTest static long peekBufferSizeInMB(CQLSSTableWriter.Builder builder) throws NoSuchFieldException, IllegalAccessException { - Field bufferSizeInMBField; - try + // The name of the size field has been changed in Cassandra code base. + // We find the field using the old name to newer one. + Field sizeField = findFirstField(builder.getClass(), + "bufferSizeInMB", "bufferSizeInMiB", "maxSSTableSizeInMiB"); + sizeField.setAccessible(true); + return (long) sizeField.get(builder); + } + + static Field findFirstField(Class<?> clazz, String... fieldNames) throws NoSuchFieldException, IllegalAccessException + { + Field field = null; + for (String fieldName : fieldNames) { - bufferSizeInMBField = ReflectionUtils.getField(builder.getClass(), "bufferSizeInMB"); + try + { + field = ReflectionUtils.getField(clazz, fieldName); + } + catch (NoSuchFieldException nsfe) + { + // ignore the exception and try with the next fieldName + } } - catch (NoSuchFieldException noSuchFieldException) + + if (field == null) { - // The bufferSizeInMB field has been renamed to bufferSizeInMiB in trunk, so we expect this to - // fail at some point, and we have a way to recover from the failure without causing the test - // to fail. - bufferSizeInMBField = ReflectionUtils.getField(builder.getClass(), "bufferSizeInMiB"); + throw new NoSuchFieldException("The class does not contain any of the supplied fieldNames: " + Arrays.asList(fieldNames)); } - bufferSizeInMBField.setAccessible(true); - return (long) bufferSizeInMBField.get(builder); + return field; } } diff --git a/scripts/build-dtest-jars.sh b/scripts/build-dtest-jars.sh index 40e461a..f0ada1e 100755 --- a/scripts/build-dtest-jars.sh +++ b/scripts/build-dtest-jars.sh @@ -38,8 +38,8 @@ else # "cassandra-4.0:cassandra-4.0" # Due to MacOS being stuck on Bash < 4, we don't use associative arrays here. CANDIDATE_BRANCHES=( - "cassandra-4.0:1f79c8492528f01bcc5f88951a1cc9e0d7265c54" - "cassandra-4.1:725655dda2776fef35567496a6e331102eb7610d" + "cassandra-4.0:cassandra-4.0.12" + "cassandra-4.1:99d9faeef57c9cf5240d11eac9db5b283e45a4f9" ) BRANCHES=( ${BRANCHES:-cassandra-4.0 cassandra-4.1} ) echo ${BRANCHES[*]} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org