This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e0ae9d7  CASSANDRA-19334 Upgrade to Cassandra 4.0.12 and remove 
BufferMode and BatchSize options
e0ae9d7 is described below

commit e0ae9d7484e242f6af495aac2cb4d8dc121fba89
Author: Yifan Cai <y...@apache.org>
AuthorDate: Wed Jan 24 15:38:41 2024 -0800

    CASSANDRA-19334 Upgrade to Cassandra 4.0.12 and remove BufferMode and 
BatchSize options
    
    In cassandra-all:4.0.12, improvements were made for the CQLSSTableWriter. 
The sorted writer now can produce size-capped SSTables. It replaces the need 
for the unsorted sstable writer, which has to buffer and sort data on flushing. 
The dataset to write in the spark application is already sorted. By avoiding 
using the unsorted writer, it prevents wasting CPU time on sorting the sorted 
data. Since the sorted sstable writer does not need to buffer data, its size 
estimation is more accurat [...]
    
    By removing the unsorted sstable writer, it no longer requires the 
RowBufferMode option.
    By supporting size-capping in sorted writer, it no longer requires the 
BatchSize option.
    
    Patch by Yifan Cai; reviewed by Francisco Guerrero for CASSANDRA-19334
---
 CHANGES.txt                                        |  1 +
 .../cassandra/spark/bulkwriter/BulkSparkConf.java  | 53 ++++++++-------
 .../spark/bulkwriter/CassandraJobInfo.java         | 18 +----
 .../apache/cassandra/spark/bulkwriter/JobInfo.java | 11 ++-
 .../cassandra/spark/bulkwriter/RecordWriter.java   | 63 +++++++----------
 .../cassandra/spark/bulkwriter/SSTableWriter.java  |  3 +-
 .../spark/bulkwriter/SSTableWriterFactory.java     |  3 -
 .../cassandra/spark/bulkwriter/WriterOptions.java  |  4 +-
 .../spark/bulkwriter/BulkSparkConfTest.java        | 54 ---------------
 .../spark/bulkwriter/MockBulkWriterContext.java    | 30 +--------
 .../spark/bulkwriter/RecordWriterTest.java         | 78 ++++++++++------------
 .../apache/cassandra/bridge/CassandraBridge.java   |  1 -
 .../org/apache/cassandra/bridge/RowBufferMode.java | 41 ------------
 cassandra-four-zero/build.gradle                   |  2 +-
 .../bridge/CassandraBridgeImplementation.java      | 23 ++++++-
 .../bridge/SSTableWriterImplementation.java        | 27 +++-----
 .../io/sstable/SSTableTombstoneWriter.java         | 11 +--
 .../bridge/SSTableWriterImplementationTest.java    | 55 ++++++++-------
 scripts/build-dtest-jars.sh                        |  4 +-
 19 files changed, 162 insertions(+), 320 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7e60c77..b105092 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Upgrade to Cassandra 4.0.12 and remove RowBufferMode and BatchSize options 
(CASSANDRA-19334)
  * Improve logging for bulk writes and on task failures (CASSANDRA-19331)
  * Allow setting TTL for snapshots created by Analytics bulk reader 
(CASSANDRA-19273)
  * Fix range split and use open-closed range notation consistently 
(CASSANDRA-19325)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 8c97b07..1a16ec3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
-import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.clients.SidecarInstanceImpl;
 import org.apache.cassandra.sidecar.client.SidecarInstance;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
@@ -83,8 +82,7 @@ public class BulkSparkConf implements Serializable
     public static final long DEFAULT_SIDECAR_REQUEST_MAX_RETRY_DELAY_MILLIS = 
TimeUnit.SECONDS.toMillis(60L);
     public static final int DEFAULT_COMMIT_BATCH_SIZE = 10_000;
     public static final int DEFAULT_RING_RETRY_COUNT = 3;
-    public static final RowBufferMode DEFAULT_ROW_BUFFER_MODE = 
RowBufferMode.UNBUFFERED;
-    public static final int DEFAULT_BATCH_SIZE_IN_ROWS = 1_000_000;
+    public static final int DEFAULT_SSTABLE_DATA_SIZE_IN_MIB = 160;
 
     // NOTE: All Cassandra Analytics setting names must start with "spark" in 
order to not be ignored by Spark,
     //       and must not start with "spark.cassandra" so as to not conflict 
with Spark Cassandra Connector
@@ -108,9 +106,7 @@ public class BulkSparkConf implements Serializable
     public final ConsistencyLevel.CL consistencyLevel;
     public final String localDC;
     public final Integer numberSplits;
-    public final RowBufferMode rowBufferMode;
-    public final Integer sstableDataSizeInMB;
-    public final Integer sstableBatchSize;
+    public final Integer sstableDataSizeInMiB;
     public final int commitBatchSize;
     public final boolean skipExtendedVerify;
     public final WriteMode writeMode;
@@ -147,9 +143,7 @@ public class BulkSparkConf implements Serializable
         this.consistencyLevel = 
ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, 
WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
         this.localDC = MapUtils.getOrDefault(options, 
WriterOptions.LOCAL_DC.name(), null);
         this.numberSplits = MapUtils.getInt(options, 
WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits");
-        this.rowBufferMode = MapUtils.getEnumOption(options, 
WriterOptions.ROW_BUFFER_MODE.name(), DEFAULT_ROW_BUFFER_MODE, "row buffering 
mode");
-        this.sstableDataSizeInMB = MapUtils.getInt(options, 
WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(), 160, "sstable data size in MB");
-        this.sstableBatchSize = MapUtils.getInt(options, 
WriterOptions.BATCH_SIZE.name(), 1_000_000, "sstable batch size");
+        this.sstableDataSizeInMiB = resolveSSTableDataSizeInMiB(options);
         this.commitBatchSize = MapUtils.getInt(options, 
WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit 
batch size");
         this.commitThreadsPerInstance = MapUtils.getInt(options, 
WriterOptions.COMMIT_THREADS_PER_INSTANCE.name(), 2, "commit threads per 
instance");
         this.keystorePassword = MapUtils.getOrDefault(options, 
WriterOptions.KEYSTORE_PASSWORD.name(), null);
@@ -179,6 +173,29 @@ public class BulkSparkConf implements Serializable
                      .collect(Collectors.toSet());
     }
 
+    protected int resolveSSTableDataSizeInMiB(Map<String, String> options)
+    {
+        int legacyOptionValue = -1;
+        if (options.containsKey(WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name()))
+        {
+            LOGGER.warn("The writer option: SSTABLE_DATA_SIZE_IN_MB is 
deprecated. " +
+                        "Please use SSTABLE_DATA_SIZE_IN_MIB instead. See 
option description for details.");
+            legacyOptionValue = MapUtils.getInt(options, 
WriterOptions.SSTABLE_DATA_SIZE_IN_MB.name(),
+                                                
DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable data size in mebibytes");
+        }
+
+        if (options.containsKey(WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name()))
+        {
+            LOGGER.info("The writer option: SSTABLE_DATA_SIZE_IN_MIB is 
defined. " +
+                        "Favor the value over SSTABLE_DATA_SIZE_IN_MB");
+            return MapUtils.getInt(options, 
WriterOptions.SSTABLE_DATA_SIZE_IN_MIB.name(),
+                                   DEFAULT_SSTABLE_DATA_SIZE_IN_MIB, "sstable 
data size in mebibytes");
+
+        }
+
+        return legacyOptionValue == -1 ? DEFAULT_SSTABLE_DATA_SIZE_IN_MIB : 
legacyOptionValue;
+    }
+
     protected Set<? extends SidecarInstance> buildSidecarInstances(Map<String, 
String> options, int sidecarPort)
     {
         String sidecarInstances = MapUtils.getOrThrow(options, 
WriterOptions.SIDECAR_INSTANCES.name(), "sidecar_instances");
@@ -193,27 +210,9 @@ public class BulkSparkConf implements Serializable
         Preconditions.checkNotNull(table);
         Preconditions.checkArgument(getHttpResponseTimeoutMs() > 0, 
HTTP_RESPONSE_TIMEOUT + " must be > 0");
         validateSslConfiguration();
-        validateTableWriterSettings();
         CassandraBridgeFactory.validateBridges();
     }
 
-    protected void validateTableWriterSettings()
-    {
-        boolean batchSizeIsZero = sstableBatchSize == 0;
-
-        if (rowBufferMode == RowBufferMode.UNBUFFERED)
-        {
-            Preconditions.checkArgument(!batchSizeIsZero,
-                                        "If writing in sorted order 
(ROW_BUFFER_MODE is UNBUFFERED) then BATCH_SIZE "
-                                        + "should be non zero, but it was set 
to 0 in writer options");
-        }
-        else if (!batchSizeIsZero && sstableBatchSize != 
DEFAULT_BATCH_SIZE_IN_ROWS)
-        {
-            LOGGER.warn("BATCH_SIZE is set to a non-zero, non-default value 
({}) but ROW_BUFFER_MODE is set to BUFFERED."
-                        + " Ignoring BATCH_SIZE.", sstableBatchSize);
-        }
-    }
-
     /**
      * Validates the SSL configuration present and throws an exception if it 
is incorrect
      *
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
index e4d4bbe..b385289 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraJobInfo.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.spark.bulkwriter;
 
 import java.util.UUID;
 
-import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.jetbrains.annotations.NotNull;
 
@@ -51,23 +50,10 @@ public class CassandraJobInfo implements JobInfo
         return conf.localDC;
     }
 
-    @NotNull
-    @Override
-    public RowBufferMode getRowBufferMode()
-    {
-        return conf.rowBufferMode;
-    }
-
-    @Override
-    public int getSstableDataSizeInMB()
-    {
-        return conf.sstableDataSizeInMB;
-    }
-
     @Override
-    public int getSstableBatchSize()
+    public int sstableDataSizeInMiB()
     {
-        return conf.sstableBatchSize;
+        return conf.sstableDataSizeInMiB;
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index 69efb05..91da29a 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.spark.bulkwriter;
 import java.io.Serializable;
 import java.util.UUID;
 
-import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.jetbrains.annotations.NotNull;
 
@@ -33,12 +32,10 @@ public interface JobInfo extends Serializable
 
     String getLocalDC();
 
-    @NotNull
-    RowBufferMode getRowBufferMode();
-
-    int getSstableDataSizeInMB();
-
-    int getSstableBatchSize();
+    /**
+     * @return the max sstable data file size in mebibytes
+     */
+    int sstableDataSizeInMiB();
 
     int getCommitBatchSize();
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index cc9bef5..908ecba 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -49,7 +49,6 @@ import com.google.common.collect.Range;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
@@ -63,16 +62,16 @@ import static 
org.apache.cassandra.spark.utils.ScalaConversionUtils.asScalaItera
 public class RecordWriter implements Serializable
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RecordWriter.class);
+
     private final BulkWriterContext writerContext;
     private final String[] columnNames;
-    private Supplier<TaskContext> taskContextSupplier;
     private final BiFunction<BulkWriterContext, Path, SSTableWriter> 
tableWriterSupplier;
-    private SSTableWriter sstableWriter = null;
-
     private final BulkWriteValidator writeValidator;
     private final ReplicaAwareFailureHandler<RingInstance> failureHandler;
-    private int batchNumber = 0;
-    private int batchSize = 0;
+
+    private Supplier<TaskContext> taskContextSupplier;
+    private SSTableWriter sstableWriter = null;
+    private int outputSequence = 0; // sub-folder for possible subrange splits
 
     public RecordWriter(BulkWriterContext writerContext, String[] columnNames)
     {
@@ -179,13 +178,12 @@ public class RecordWriter implements Serializable
                 streamSession = maybeCreateStreamSession(taskContext, 
streamSession, currentRange, failureHandler, results);
                 maybeCreateTableWriter(partitionId, baseDir);
                 writeRow(rowData, valueMap, partitionId, 
streamSession.getTokenRange());
-                checkBatchSize(streamSession, partitionId, job);
             }
 
             // Finalize SSTable for the last StreamSession
             if (sstableWriter != null)
             {
-                finalizeSSTable(streamSession, partitionId, sstableWriter, 
batchNumber, batchSize);
+                finalizeSSTable(streamSession, partitionId);
                 results.add(streamSession.close());
             }
             LOGGER.info("[{}] Done with all writers and waiting for stream to 
complete", partitionId);
@@ -256,13 +254,8 @@ public class RecordWriter implements Serializable
             // Schedule data to be sent if we are processing a batch that has 
not been scheduled yet.
             if (streamSession != null)
             {
-                // Complete existing batched writes (if any) before the 
existing stream session is closed
-                if (batchSize != 0)
-                {
-                    finalizeSSTable(streamSession, taskContext.partitionId(), 
sstableWriter, batchNumber, batchSize);
-                    sstableWriter = null;
-                    batchSize = 0;
-                }
+                // Complete existing writes (if any) before the existing 
stream session is closed
+                finalizeSSTable(streamSession, taskContext.partitionId());
                 results.add(streamSession.close());
             }
             streamSession = new StreamSession(writerContext, 
getStreamId(taskContext), matchingSubRange, failureHandler);
@@ -360,32 +353,15 @@ public class RecordWriter implements Serializable
         }
     }
 
-    /**
-     * Stream to replicas; if batchSize is reached, "finalize" SST to 
"schedule" streamSession
-     */
-    private void checkBatchSize(StreamSession streamSession, int partitionId, 
JobInfo job) throws IOException
-    {
-        if (job.getRowBufferMode() == RowBufferMode.UNBUFFERED)
-        {
-            batchSize++;
-            if (batchSize >= job.getSstableBatchSize())
-            {
-                finalizeSSTable(streamSession, partitionId, sstableWriter, 
batchNumber, batchSize);
-                sstableWriter = null;
-                batchSize = 0;
-            }
-        }
-    }
-
     private void maybeCreateTableWriter(int partitionId, Path baseDir) throws 
IOException
     {
         if (sstableWriter == null)
         {
-            Path outDir = Paths.get(baseDir.toString(), 
Integer.toString(++batchNumber));
+            Path outDir = Paths.get(baseDir.toString(), 
Integer.toString(outputSequence++));
             Files.createDirectories(outDir);
 
             sstableWriter = tableWriterSupplier.apply(writerContext, outDir);
-            LOGGER.info("[{}][{}] Created new SSTable writer", partitionId, 
batchNumber);
+            LOGGER.info("[{}] Created new SSTable writer", partitionId);
         }
     }
 
@@ -399,16 +375,23 @@ public class RecordWriter implements Serializable
         return map;
     }
 
+    /**
+     * Close the {@link RecordWriter#sstableWriter} if present. Schedule a 
stream session with the produced sstables.
+     * And finally, nullify {@link RecordWriter#sstableWriter}
+     */
     private void finalizeSSTable(StreamSession streamSession,
-                                 int partitionId,
-                                 SSTableWriter sstableWriter,
-                                 int batchNumber,
-                                 int batchSize) throws IOException
+                                 int partitionId) throws IOException
     {
-        LOGGER.info("[{}][{}] Closing writer and scheduling SStable stream 
with {} rows",
-                    partitionId, batchNumber, batchSize);
+        if (sstableWriter == null)
+        {
+            LOGGER.warn("SSTableWriter is null. Nothing to finalize");
+            return;
+        }
+        LOGGER.info("[{}] Closing writer and scheduling SStable stream",
+                    partitionId);
         sstableWriter.close(writerContext, partitionId);
         streamSession.scheduleStream(sstableWriter);
+        sstableWriter = null;
     }
 
     private StreamSession createStreamSession(TaskContext taskContext)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
index bc06e0f..18021f2 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriter.java
@@ -81,8 +81,7 @@ public class SSTableWriter
         writerContext.cluster().getPartitioner().toString(),
         tableSchema.createStatement,
         tableSchema.modificationStatement,
-        writerContext.job().getRowBufferMode(),
-        writerContext.job().getSstableDataSizeInMB());
+        writerContext.job().sstableDataSizeInMiB());
     }
 
     @NotNull
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
index 50c5d47..55cace3 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SSTableWriterFactory.java
@@ -22,7 +22,6 @@ package org.apache.cassandra.spark.bulkwriter;
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
 import org.apache.cassandra.bridge.CassandraVersionFeatures;
-import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.bridge.SSTableWriter;
 
 public final class SSTableWriterFactory
@@ -37,7 +36,6 @@ public final class SSTableWriterFactory
                                                  String partitioner,
                                                  String createStatement,
                                                  String insertStatement,
-                                                 RowBufferMode rowBufferMode,
                                                  int bufferSizeMB)
     {
         CassandraBridge cassandraBridge = 
CassandraBridgeFactory.get(serverVersion);
@@ -45,7 +43,6 @@ public final class SSTableWriterFactory
                                                 partitioner,
                                                 createStatement,
                                                 insertStatement,
-                                                rowBufferMode,
                                                 bufferSizeMB);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
index 075672f..3dd68dd 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/WriterOptions.java
@@ -27,7 +27,6 @@ public enum WriterOptions implements WriterOption
     BULK_WRITER_CL,
     LOCAL_DC,
     NUMBER_SPLITS,
-    BATCH_SIZE,
     COMMIT_THREADS_PER_INSTANCE,
     COMMIT_BATCH_SIZE,
     SKIP_EXTENDED_VERIFY,
@@ -41,8 +40,9 @@ public enum WriterOptions implements WriterOption
     TRUSTSTORE_PATH,
     TRUSTSTORE_BASE64_ENCODED,
     SIDECAR_PORT,
-    ROW_BUFFER_MODE,
+    @Deprecated // the size unit `MB` is incorrect, use 
`SSTABLE_DATA_SIZE_IN_MIB` instead
     SSTABLE_DATA_SIZE_IN_MB,
+    SSTABLE_DATA_SIZE_IN_MIB,
     TTL,
     TIMESTAMP,
     /**
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
index 56caa5b..b55045d 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConfTest.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Maps;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.spark.bulkwriter.util.SbwKryoRegistrator;
 import org.apache.cassandra.spark.utils.BuildInfo;
 import org.apache.spark.SparkConf;
@@ -215,59 +214,6 @@ public class BulkSparkConfTest
                      + "Please provide option " + 
WriterOptions.TRUSTSTORE_PASSWORD, npe.getMessage());
     }
 
-    @Test
-    public void testUnbufferedRowBufferMode()
-    {
-        Map<String, String> options = copyDefaultOptions();
-        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "UNBUFFERED");
-        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
-        assertNotNull(bulkSparkConf);
-        assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.UNBUFFERED);
-    }
-
-    @Test
-    public void testBufferedRowBufferMode()
-    {
-        Map<String, String> options = copyDefaultOptions();
-        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
-        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
-        assertNotNull(bulkSparkConf);
-        assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
-    }
-
-    @Test
-    public void testInvalidRowBufferMode()
-    {
-        Map<String, String> options = copyDefaultOptions();
-        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "invalid");
-        IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
-                                                          () -> new 
BulkSparkConf(sparkConf, options));
-        assertEquals("Key row buffering mode with value invalid is not a valid 
Enum of type class org.apache.cassandra.bridge.RowBufferMode.",
-                     exception.getMessage());
-    }
-
-    @Test
-    public void testBufferedRowBufferModeWithZeroBatchSize()
-    {
-        Map<String, String> options = copyDefaultOptions();
-        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
-        options.put(WriterOptions.BATCH_SIZE.name(), "0");
-        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
-        assertNotNull(bulkSparkConf);
-        assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
-    }
-
-    @Test
-    public void testNonZeroBatchSizeIsIgnoredWithBufferedRowBufferMode()
-    {
-        Map<String, String> options = copyDefaultOptions();
-        options.put(WriterOptions.BATCH_SIZE.name(), "5");
-        options.put(WriterOptions.ROW_BUFFER_MODE.name(), "BUFFERED");
-        BulkSparkConf bulkSparkConf = new BulkSparkConf(sparkConf, options);
-        assertNotNull(bulkSparkConf);
-        assertEquals(bulkSparkConf.rowBufferMode, RowBufferMode.BUFFERED);
-    }
-
     @Test
     void testQuoteIdentifiers()
     {
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 1f589da..e864dac 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -40,7 +40,6 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
-import org.apache.cassandra.bridge.RowBufferMode;
 import org.apache.cassandra.sidecar.common.data.TimeSkewResponse;
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
@@ -56,7 +55,6 @@ import 
org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.validation.StartupValidator;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructType;
-import org.jetbrains.annotations.NotNull;
 
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.DATE;
 import static org.apache.cassandra.spark.bulkwriter.SqlToCqlTypeConverter.INT;
@@ -74,7 +72,6 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
     new org.apache.spark.sql.types.DataType[]{DataTypes.IntegerType, 
DataTypes.DateType, DataTypes.StringType, DataTypes.IntegerType},
     new CqlField.CqlType[]{mockCqlType(INT), mockCqlType(DATE), 
mockCqlType(VARCHAR), mockCqlType(INT)});
     private final boolean quoteIdentifiers;
-    private RowBufferMode rowBufferMode = RowBufferMode.UNBUFFERED;
     private ConsistencyLevel.CL consistencyLevel;
     private int sstableDataSizeInMB = 128;
     private int sstableWriteBatchSize = 2;
@@ -219,20 +216,8 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         return "DC1";
     }
 
-    @NotNull
     @Override
-    public RowBufferMode getRowBufferMode()
-    {
-        return rowBufferMode;
-    }
-
-    public void setRowBufferMode(RowBufferMode rowBufferMode)
-    {
-        this.rowBufferMode = rowBufferMode;
-    }
-
-    @Override
-    public int getSstableDataSizeInMB()
+    public int sstableDataSizeInMiB()
     {
         return sstableDataSizeInMB;
     }
@@ -243,19 +228,6 @@ public class MockBulkWriterContext implements 
BulkWriterContext, ClusterInfo, Jo
         this.sstableDataSizeInMB = sstableDataSizeInMB;
     }
 
-    @Override
-    public int getSstableBatchSize()
-    {
-        return sstableWriteBatchSize;
-    }
-
-    @VisibleForTesting
-    void setSstableWriteBatchSize(int sstableWriteBatchSize)
-    {
-        this.sstableWriteBatchSize = sstableWriteBatchSize;
-    }
-
-    @Override
     public int getCommitBatchSize()
     {
         return 1;
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index b2fa210..92fda4c 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -76,10 +76,9 @@ public class RecordWriterTest
 {
     private static final int REPLICA_COUNT = 3;
     private static final int FILES_PER_SSTABLE = 8;
-    // writing 5 rows with batch size of 2 should produce 3 sstable
-    private static final int UPLOADED_SSTABLES = 3;
-    private static final int ROWS_COUNT = 5;
-    private static final int BATCH_SIZE = 2;
+    // writing 270 rows with sstable size cap of 1 MB should produce 2 sstable
+    private static final int UPLOADED_SSTABLES = 2;
+    private static final int ROWS_COUNT = 270;
     private static final String[] COLUMN_NAMES = {
     "id", "date", "course", "marks"
     };
@@ -101,7 +100,7 @@ public class RecordWriterTest
         tw = new MockTableWriter(folder.getRoot());
         tokenRangeMapping = 
TokenRangeMappingUtils.buildTokenRangeMapping(100000, ImmutableMap.of("DC1", 
3), 12);
         writerContext = new MockBulkWriterContext(tokenRangeMapping);
-        writerContext.setSstableWriteBatchSize(BATCH_SIZE); // create a new 
sstable after writing 2 rows
+        writerContext.setSstableDataSizeInMB(1); // defaults to the minimum 
sstable data size allowed to set
         tc = new TestTaskContext();
         range = 
writerContext.job().getTokenPartitioner().getTokenRange(tc.partitionId());
         tokenizer = new Tokenizer(writerContext);
@@ -138,7 +137,7 @@ public class RecordWriterTest
         rw = new RecordWriter(m, COLUMN_NAMES, () -> tc, SSTableWriter::new);
 
         
when(m.getTokenRangeMapping(false)).thenCallRealMethod().thenReturn(testMapping);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
         assertThat(ex.getMessage(), containsString("Token range mappings have 
changed since the task started"));
     }
@@ -168,7 +167,7 @@ public class RecordWriterTest
 
         when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping);
         when(m.getInstanceAvailability()).thenReturn(availability);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         rw.write(data);
         Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
         // Should not upload to blocked instances
@@ -189,7 +188,7 @@ public class RecordWriterTest
 
         when(m.getTokenRangeMapping(anyBoolean())).thenReturn(testMapping);
         when(m.getInstanceAvailability()).thenCallRealMethod();
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         rw.write(data);
         Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
         assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
@@ -198,7 +197,7 @@ public class RecordWriterTest
     @Test
     public void testSuccessfulWrite() throws InterruptedException
     {
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
     }
 
@@ -221,7 +220,8 @@ public class RecordWriterTest
                                                                         pk,
                                                                         pk,
                                                                         
quoteIdentifiers);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        writerContext.setSstableDataSizeInMB(1);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         validateSuccessfulWrite(writerContext, data, columnNames);
     }
 
@@ -229,7 +229,7 @@ public class RecordWriterTest
     public void testSuccessfulWriteCheckUploads()
     {
         rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, 
SSTableWriter::new);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         rw.write(data);
         Map<CassandraInstance, List<UploadRequest>> uploads = 
writerContext.getUploads();
         assertThat(uploads.keySet().size(), is(REPLICA_COUNT));  // Should 
upload to 3 replicas
@@ -244,53 +244,48 @@ public class RecordWriterTest
     @Test
     public void testWriteWithConstantTTL() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, 
false, false);
-        validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, 
false);
+        validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
     }
 
     @Test
     public void testWriteWithTTLColumn() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, 
true, false);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, 
false);
         String[] columnNamesWithTtl =
         {
         "id", "date", "course", "marks", "ttl"
         };
-        validateSuccessfulWrite(bulkWriterContext, data, columnNamesWithTtl);
+        validateSuccessfulWrite(writerContext, data, columnNamesWithTtl);
     }
 
     @Test
     public void testWriteWithConstantTimestamp() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, 
false, false);
-        validateSuccessfulWrite(bulkWriterContext, data, COLUMN_NAMES);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, 
false);
+        validateSuccessfulWrite(writerContext, data, COLUMN_NAMES);
     }
 
     @Test
     public void testWriteWithTimestampColumn() throws InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, 
false, true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false, 
true);
         String[] columnNamesWithTimestamp =
         {
         "id", "date", "course", "marks", "timestamp"
         };
-        validateSuccessfulWrite(bulkWriterContext, data, 
columnNamesWithTimestamp);
+        validateSuccessfulWrite(writerContext, data, columnNamesWithTimestamp);
     }
 
     @Test
     public void testWriteWithTimestampAndTTLColumn() throws 
InterruptedException
     {
-        MockBulkWriterContext bulkWriterContext = new 
MockBulkWriterContext(tokenRangeMapping);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, 
true, true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true, 
true);
         String[] columnNames =
         {
         "id", "date", "course", "marks", "ttl", "timestamp"
         };
-        validateSuccessfulWrite(bulkWriterContext, data, columnNames);
+        validateSuccessfulWrite(writerContext, data, columnNames);
     }
 
     @Test
@@ -390,7 +385,7 @@ public class RecordWriterTest
     public void testCorruptSSTable()
     {
         rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw.setOutDir(path), path));
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         // TODO: Add better error handling with human-readable exception 
messages in SSTableReader::new
         // That way we can assert on the exception thrown here
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
@@ -399,9 +394,8 @@ public class RecordWriterTest
     @Test
     public void testWriteWithOutOfRangeTokenFails()
     {
-        writerContext.setSstableWriteBatchSize(ROWS_COUNT + 100); // Write all 
rows in the same sstable
         rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder));
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(false);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(5, 
Range.all(), false, false, false);
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
         String expectedErr = "java.lang.IllegalStateException: Received Token 
" +
                              "5765203080415074583 outside the expected ranges 
[(-9223372036854775808‥100000]]";
@@ -413,7 +407,7 @@ public class RecordWriterTest
     {
         rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder));
         tw.setAddRowThrows(true);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
         assertEquals("java.lang.RuntimeException: Failed to write because 
addRow throws", ex.getMessage());
     }
@@ -425,7 +419,7 @@ public class RecordWriterTest
         long sixtyOneMinutesInMillis = TimeUnit.MINUTES.toMillis(61);
         rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, (wc, 
path) -> new SSTableWriter(tw, folder));
         writerContext.setTimeProvider(() -> System.currentTimeMillis() - 
sixtyOneMinutesInMillis);
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         RuntimeException ex = assertThrows(RuntimeException.class, () -> 
rw.write(data));
         assertThat(ex.getMessage(), startsWith("Time skew between Spark and 
Cassandra is too large. Allowable skew is 60 minutes. Spark executor time is 
"));
     }
@@ -438,7 +432,7 @@ public class RecordWriterTest
         long remoteTime = System.currentTimeMillis() - 
fiftyNineMinutesInMillis;
         rw = new RecordWriter(writerContext, COLUMN_NAMES, () -> tc, 
SSTableWriter::new);
         writerContext.setTimeProvider(() -> remoteTime);  // Return a very low 
"current time" to make sure we fail if skew is too bad
-        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData(true);
+        Iterator<Tuple2<DecoratedKey, Object[]>> data = generateData();
         rw.write(data);
     }
 
@@ -473,19 +467,16 @@ public class RecordWriterTest
         }
     }
 
-    private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(boolean 
onlyInRange)
+    private Iterator<Tuple2<DecoratedKey, Object[]>> generateData()
     {
-        return generateData(onlyInRange, false, false);
+        return generateData(false, false);
     }
 
-    private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(boolean 
onlyInRange, boolean withTTL, boolean withTimestamp)
+    private Iterator<Tuple2<DecoratedKey, Object[]>> generateData(boolean 
withTTL, boolean withTimestamp)
     {
-        return onlyInRange
-               ? generateData(ROWS_COUNT, range, false, withTTL, 
withTimestamp) // accept only tokens in range
-               : generateData(ROWS_COUNT, Range.all(), false, withTTL, 
withTimestamp); // accept all tokens
+        return generateData(ROWS_COUNT, range, false, withTTL, withTimestamp);
     }
 
-
     // generate data with fake tokens assigend. The fake tokens are provided 
by the input range.
     // Although the data generated have fake tokens, the actual tokens 
computed from each tuple
     // are still ordered ascendingly.
@@ -508,34 +499,35 @@ public class RecordWriterTest
                                                                   boolean 
fakeTokens,
                                                                   boolean 
withTTL, boolean withTimestamp)
     {
+        String courseString = IntStream.range(0, 100000).boxed().map(i -> 
"Long long string").collect(Collectors.joining());
         Stream<Tuple2<DecoratedKey, Object[]>> source = IntStream.iterate(0, 
integer -> integer + 1).mapToObj(index -> {
             Object[] columns;
             if (withTTL && withTimestamp)
             {
                 columns = new Object[]
                           {
-                          index, index, "foo" + index, index, index * 100, 
System.currentTimeMillis() * 1000
+                          index, index, courseString, index, index * 100, 
System.currentTimeMillis() * 1000
                           };
             }
             else if (withTimestamp)
             {
                 columns = new Object[]
                           {
-                          index, index, "foo" + index, index, 
System.currentTimeMillis() * 1000
+                          index, index, courseString, index, 
System.currentTimeMillis() * 1000
                           };
             }
             else if (withTTL)
             {
                 columns = new Object[]
                           {
-                          index, index, "foo" + index, index, index * 100
+                          index, index, courseString, index, index * 100
                           };
             }
             else
             {
                 columns = new Object[]
                           {
-                          index, index, "foo" + index, index
+                          index, index, courseString, index
                           };
             }
             return Tuple2.apply(tokenizer.getDecoratedKey(columns), columns);
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index cfc7057..6c0240c 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -390,7 +390,6 @@ public abstract class CassandraBridge
                                                    String partitioner,
                                                    String createStatement,
                                                    String insertStatement,
-                                                   RowBufferMode rowBufferMode,
                                                    int bufferSizeMB);
 
     public interface IRow
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java
deleted file mode 100644
index 70be08c..0000000
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/RowBufferMode.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.bridge;
-
-/**
- * Configures how data is flushed to an SSTable
- */
-public enum RowBufferMode
-{
-    /**
-     * In this mode, Cassandra will flush an SSTable to disk once it reaches 
the configured BufferSizeInMB.
-     * This parameter is configured by the user-configurable 
SSTABLE_DATA_SIZE_IN_MB WriterOption. Note:
-     * This is the uncompressed size of data before being written to disk, and 
the actual size of an SSTable
-     * can be smaller based on the compression configuration for the SSTable 
and how compressible the data is.
-     */
-    BUFFERED,
-
-    /**
-     * Cassandra expects rows in sorted order and will not flush an SSTable 
automatically. The size of an
-     * SSTable is based on the number of rows we write to the SSTable. This 
parameter is configured by the
-     * user-configurable BATCH_SIZE WriterOption.
-     */
-    UNBUFFERED
-}
diff --git a/cassandra-four-zero/build.gradle b/cassandra-four-zero/build.gradle
index b93e8fb..9b90451 100644
--- a/cassandra-four-zero/build.gradle
+++ b/cassandra-four-zero/build.gradle
@@ -29,7 +29,7 @@ project(':cassandra-four-zero') {
         compileOnly(group: "${sparkGroupId}", name: 
"spark-core_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
         compileOnly(group: "${sparkGroupId}", name: 
"spark-sql_${scalaMajorVersion}", version: 
"${project.rootProject.sparkVersion}")
 
-        implementation(group: 'org.apache.cassandra', name: 'cassandra-all', 
version: '4.0.2') {
+        implementation(group: 'org.apache.cassandra', name: 'cassandra-all', 
version: '4.0.12') {
             // Exclude JNA libraries from the cassandra-all dependency tree 
because Spark has its own version
             // and trying to load two different versions causes issues with 
the native libraries
             exclude(group: 'net.java.dev.jna')
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 68a896c..f1274ad 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -51,8 +51,10 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.ParameterizedClass;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.commitlog.CommitLogSegmentManagerStandard;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.db.marshal.CompositeType;
@@ -164,6 +166,7 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                 throw new RuntimeException(exception);
             }
             config.data_file_directories = new 
String[]{tempDirectory.toString()};
+            setupCommitLogConfigs(tempDirectory);
             DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch());
             Keyspace.setInitialized();
 
@@ -171,6 +174,23 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
         }
     }
 
+    public static void setupCommitLogConfigs(Path path)
+    {
+        String commitLogPath = path + "/commitlog";
+        DatabaseDescriptor.getRawConfig().commitlog_directory = commitLogPath;
+        DatabaseDescriptor.getRawConfig().hints_directory = path + "/hints";
+        DatabaseDescriptor.getRawConfig().saved_caches_directory = path + 
"/saved_caches";
+        DatabaseDescriptor.setCommitLogSync(Config.CommitLogSync.periodic);
+        DatabaseDescriptor.setCommitLogCompression(new 
ParameterizedClass("LZ4Compressor",
+                                                                          
Collections.emptyMap()));
+        DatabaseDescriptor.setCommitLogSyncPeriod(30);
+        DatabaseDescriptor.setCommitLogMaxCompressionBuffersPerPool(3);
+        DatabaseDescriptor.setCommitLogSyncGroupWindow(30);
+        DatabaseDescriptor.setCommitLogSegmentSize(32);
+        DatabaseDescriptor.getRawConfig().commitlog_total_space_in_mb = 1024;
+        DatabaseDescriptor.setCommitLogSegmentMgrProvider(commitLog -> new 
CommitLogSegmentManagerStandard(commitLog, commitLogPath));
+    }
+
     public CassandraBridgeImplementation()
     {
         // Cassandra-version-specific Kryo serializers
@@ -581,10 +601,9 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                           String partitioner,
                                           String createStatement,
                                           String insertStatement,
-                                          RowBufferMode rowBufferMode,
                                           int bufferSizeMB)
     {
-        return new SSTableWriterImplementation(inDirectory, partitioner, 
createStatement, insertStatement, rowBufferMode, bufferSizeMB);
+        return new SSTableWriterImplementation(inDirectory, partitioner, 
createStatement, insertStatement, bufferSizeMB);
     }
 
     // Version-Specific Test Utility Methods
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
index facfad6..89e14f1 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java
@@ -44,7 +44,6 @@ public class SSTableWriterImplementation implements 
SSTableWriter
                                        String partitioner,
                                        String createStatement,
                                        String insertStatement,
-                                       RowBufferMode rowBufferMode,
                                        int bufferSizeMB)
     {
         IPartitioner cassPartitioner = 
partitioner.toLowerCase().contains("random") ? new RandomPartitioner()
@@ -53,7 +52,6 @@ public class SSTableWriterImplementation implements 
SSTableWriter
         CQLSSTableWriter.Builder builder = configureBuilder(inDirectory,
                                                             createStatement,
                                                             insertStatement,
-                                                            rowBufferMode,
                                                             bufferSizeMB,
                                                             cassPartitioner);
         // TODO: Remove me once CQLSSTableWriter.Builder synchronize on schema 
(see CASSANDRA-TBD)
@@ -84,23 +82,18 @@ public class SSTableWriterImplementation implements 
SSTableWriter
     static CQLSSTableWriter.Builder configureBuilder(String inDirectory,
                                                      String createStatement,
                                                      String insertStatement,
-                                                     RowBufferMode 
rowBufferMode,
                                                      int bufferSizeMB,
                                                      IPartitioner 
cassPartitioner)
     {
-        CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
-                                                           
.inDirectory(inDirectory)
-                                                           
.forTable(createStatement)
-                                                           
.withPartitioner(cassPartitioner)
-                                                           
.using(insertStatement);
-        if (rowBufferMode == RowBufferMode.UNBUFFERED)
-        {
-            builder.sorted();
-        }
-        else if (rowBufferMode == RowBufferMode.BUFFERED)
-        {
-            builder.withBufferSizeInMB(bufferSizeMB);
-        }
-        return builder;
+        return CQLSSTableWriter
+               .builder()
+               .inDirectory(inDirectory)
+               .forTable(createStatement)
+               .withPartitioner(cassPartitioner)
+               .using(insertStatement)
+               // The data frame to write is always sorted,
+               // see 
org.apache.cassandra.spark.bulkwriter.CassandraBulkSourceRelation.insert
+               .sorted()
+               .withMaxSSTableSizeInMiB(bufferSizeMB);
     }
 }
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
index aa39324..32ff705 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/io/sstable/SSTableTombstoneWriter.java
@@ -32,6 +32,8 @@ import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.bridge.CassandraSchema;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnSpecification;
@@ -70,7 +72,9 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Re-write of CQLSSTableWriter for writing tombstones to an SSTable for 
testing
+ * Used for testing purpose only
  */
+@VisibleForTesting
 public final class SSTableTombstoneWriter implements Closeable
 {
     private static final ByteBuffer UNSET_VALUE = 
ByteBufferUtil.UNSET_BYTE_BUFFER;
@@ -287,7 +291,6 @@ public final class SSTableTombstoneWriter implements 
Closeable
         private ModificationStatement.Parsed deleteStatement;
         private IPartitioner partitioner;
 
-        private boolean sorted = false;
         private long bufferSizeInMB = 128;
 
         Builder()
@@ -440,9 +443,9 @@ public final class SSTableTombstoneWriter implements 
Closeable
 
             DeleteStatement preparedDelete = prepareDelete();
             TableMetadataRef ref = 
TableMetadataRef.forOfflineTools(tableMetadata);
-            AbstractSSTableSimpleWriter writer = sorted
-                    ? new SSTableSimpleWriter(directory, ref, 
preparedDelete.updatedColumns())
-                    : new SSTableSimpleUnsortedWriter(directory, ref, 
preparedDelete.updatedColumns(), bufferSizeInMB);
+            AbstractSSTableSimpleWriter writer = new 
SSTableSimpleUnsortedWriter(directory, ref,
+                                                                               
  preparedDelete.updatedColumns(),
+                                                                               
  bufferSizeInMB);
 
             if (formatType != null)
             {
diff --git 
a/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
index 6ede1d0..e5e5846 100644
--- 
a/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
+++ 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/bridge/SSTableWriterImplementationTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.bridge;
 
 import java.io.File;
 import java.lang.reflect.Field;
+import java.util.Arrays;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -30,8 +31,6 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.utils.ReflectionUtils;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -46,32 +45,16 @@ class SSTableWriterImplementationTest
     File writeDirectory;
 
     @Test
-    void testUnbufferedRowBufferModeConfiguration() throws 
NoSuchFieldException, IllegalAccessException
+    void testSSTableWriterConfiguration() throws NoSuchFieldException, 
IllegalAccessException
     {
         CQLSSTableWriter.Builder builder = 
SSTableWriterImplementation.configureBuilder(writeDirectory.getAbsolutePath(),
                                                                                
         CREATE_STATEMENT,
                                                                                
         INSERT_STATEMENT,
-                                                                               
         RowBufferMode.UNBUFFERED,
                                                                                
         250,
                                                                                
         new Murmur3Partitioner());
 
 
         assertTrue(peekSorted(builder));
-        assertNotEquals(250, peekBufferSizeInMB(builder)); // 250 should not 
be set
-    }
-
-    @Test
-    void testBufferedRowBufferModeConfiguration() throws NoSuchFieldException, 
IllegalAccessException
-    {
-        CQLSSTableWriter.Builder builder = 
SSTableWriterImplementation.configureBuilder(writeDirectory.getAbsolutePath(),
-                                                                               
         CREATE_STATEMENT,
-                                                                               
         INSERT_STATEMENT,
-                                                                               
         RowBufferMode.BUFFERED,
-                                                                               
         250,
-                                                                               
         new Murmur3Partitioner());
-
-
-        assertFalse(peekSorted(builder));
         assertEquals(250, peekBufferSizeInMB(builder));
     }
 
@@ -84,20 +67,34 @@ class SSTableWriterImplementationTest
 
     static long peekBufferSizeInMB(CQLSSTableWriter.Builder builder) throws 
NoSuchFieldException, IllegalAccessException
     {
-        Field bufferSizeInMBField;
-        try
+        // The name of the size field has been changed in Cassandra code base.
+        // We find the field using the old name to newer one.
+        Field sizeField = findFirstField(builder.getClass(),
+                                         "bufferSizeInMB", "bufferSizeInMiB", 
"maxSSTableSizeInMiB");
+        sizeField.setAccessible(true);
+        return (long) sizeField.get(builder);
+    }
+
+    static Field findFirstField(Class<?> clazz, String... fieldNames) throws 
NoSuchFieldException, IllegalAccessException
+    {
+        Field field = null;
+        for (String fieldName : fieldNames)
         {
-            bufferSizeInMBField = ReflectionUtils.getField(builder.getClass(), 
"bufferSizeInMB");
+            try
+            {
+                field = ReflectionUtils.getField(clazz, fieldName);
+            }
+            catch (NoSuchFieldException nsfe)
+            {
+                // ignore the exception and try with the next fieldName
+            }
         }
-        catch (NoSuchFieldException noSuchFieldException)
+
+        if (field == null)
         {
-            // The bufferSizeInMB field has been renamed to bufferSizeInMiB in 
trunk, so we expect this to
-            // fail at some point, and we have a way to recover from the 
failure without causing the test
-            // to fail.
-            bufferSizeInMBField = ReflectionUtils.getField(builder.getClass(), 
"bufferSizeInMiB");
+            throw new NoSuchFieldException("The class does not contain any of 
the supplied fieldNames: " + Arrays.asList(fieldNames));
         }
 
-        bufferSizeInMBField.setAccessible(true);
-        return (long) bufferSizeInMBField.get(builder);
+        return field;
     }
 }
diff --git a/scripts/build-dtest-jars.sh b/scripts/build-dtest-jars.sh
index 40e461a..f0ada1e 100755
--- a/scripts/build-dtest-jars.sh
+++ b/scripts/build-dtest-jars.sh
@@ -38,8 +38,8 @@ else
   #   "cassandra-4.0:cassandra-4.0"
   # Due to MacOS being stuck on Bash < 4, we don't use associative arrays here.
   CANDIDATE_BRANCHES=(
-    "cassandra-4.0:1f79c8492528f01bcc5f88951a1cc9e0d7265c54"
-    "cassandra-4.1:725655dda2776fef35567496a6e331102eb7610d"
+    "cassandra-4.0:cassandra-4.0.12"
+    "cassandra-4.1:99d9faeef57c9cf5240d11eac9db5b283e45a4f9"
   )
   BRANCHES=( ${BRANCHES:-cassandra-4.0 cassandra-4.1} )
   echo ${BRANCHES[*]}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to