This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 46c35d0  CASSANDRA-19411: Bulk reader fails to produce a row when 
regular column values are null
46c35d0 is described below

commit 46c35d0ef2efb66512133a7913df9936b0a80dc8
Author: Francisco Guerrero <fran...@apache.org>
AuthorDate: Mon Feb 19 20:50:16 2024 -0800

    CASSANDRA-19411: Bulk reader fails to produce a row when regular column 
values are null
    
    Bulk Reader won't emit a row when the regular column values are all `null`. 
For example,
    a schema `PK` = `a`, `b` ; `CK` = `c`, `d` ; and columns = `e`, `f`.
    
    |  a  |  b  |  c  |  d  |  e   |  f   |
    | --- | --- | --- | --- | ---- | ---- |
    | pk1 | pk2 | ck1 | ck2 | null | null |
    
    When queried from Analytics bulk reader, it won't produce a row.
    
    This issue also occurs when the projected regular column values are all 
`null`, where
    other non-projected columns might have some values.
    
    Patch by Francisco Guerrero; Reviewed by Yifan Cai for CASSANDRA-19411
---
 CHANGES.txt                                        |   1 +
 .../spark/sparksql/AbstractSparkRowIterator.java   |   6 +-
 .../cassandra/spark/sparksql/FullRowBuilder.java   |  12 +-
 .../spark/sparksql/SparkCellIterator.java          |  51 +++++---
 .../cassandra/spark/sparksql/SparkRowIterator.java |   2 +-
 .../cassandra/spark/sparksql/SparkRowIterator.java |  16 +--
 .../org/apache/cassandra/spark/EndToEndTests.java  | 113 ++++++++++++++++
 .../java/org/apache/cassandra/spark/Tester.java    | 143 +++++++++------------
 .../cassandra/spark/reader/EmptyStreamScanner.java |   6 +
 .../cassandra/spark/reader/StreamScanner.java      |   5 +
 .../spark/reader/common/IndexIterator.java         |   6 +
 .../cassandra/spark/utils/test/TestSchema.java     |  28 +++-
 .../spark/reader/AbstractStreamScanner.java        |   6 +
 13 files changed, 271 insertions(+), 124 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9daa5f0..1472baf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Bulk reader fails to produce a row when regular column values are null 
(CASSANDRA-19411)
  * Use XXHash32 for digest calculation of SSTables (CASSANDRA-19369)
  * Startup Validation Failures when Checking Sidecar Connectivity 
(CASSANDRA-19377)
  * No longer need to synchronize on Schema.instance after Cassandra 4.0.12 
(CASSANDRA-19351)
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
index 7882fe9..cc2f810 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
@@ -49,8 +49,8 @@ abstract class AbstractSparkRowIterator
 
     protected final List<SchemaFeature> requestedFeatures;
     protected final CqlTable cqlTable;
-    protected final boolean noValueColumns;
     protected final StructType columnFilter;
+    protected final boolean hasProjectedValueColumns;
 
     private Cell cell = null;
     private InternalRow row = null;
@@ -67,7 +67,7 @@ abstract class AbstractSparkRowIterator
         this.stats.openedSparkRowIterator();
         this.openTimeNanos = System.nanoTime();
         this.requestedFeatures = dataLayer.requestedFeatures();
-        this.noValueColumns = it.noValueColumns();
+        this.hasProjectedValueColumns = it.hasProjectedValueColumns();
         this.builder = newBuilder();
     }
 
@@ -132,7 +132,7 @@ abstract class AbstractSparkRowIterator
 
             builder.onCell(cell);
 
-            if (!noValueColumns)
+            if (hasProjectedValueColumns)
             {
                 // If schema has value column
                 builder.copyValue(cell);
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
index 823fdbe..5a06343 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
@@ -35,18 +35,18 @@ class FullRowBuilder implements RowBuilder
     static final Object[] EMPTY_RESULT = new Object[0];
     final int numColumns;
     final int numCells;
-    final boolean noValueColumns;
+    final boolean hasProjectedValueColumns;
     int extraColumns;
     Object[] result;
     int count;
     private final CqlTable cqlTable;
 
-    FullRowBuilder(CqlTable cqlTable, boolean noValueColumns)
+    FullRowBuilder(CqlTable cqlTable, boolean hasProjectedValueColumns)
     {
         this.cqlTable = cqlTable;
         this.numColumns = cqlTable.numFields();
-        this.numCells = cqlTable.numNonValueColumns() + (noValueColumns ? 0 : 
1);
-        this.noValueColumns = noValueColumns;
+        this.hasProjectedValueColumns = hasProjectedValueColumns;
+        this.numCells = cqlTable.numNonValueColumns() + 
(hasProjectedValueColumns ? 1 : 0);
     }
 
     @Override
@@ -80,7 +80,7 @@ class FullRowBuilder implements RowBuilder
     public void copyKeys(Cell cell)
     {
         // Need to handle special case where schema is only partition or 
clustering keys - i.e. no value columns
-        int length = noValueColumns ? cell.values.length : cell.values.length 
- 1;
+        int length = !hasProjectedValueColumns ? cell.values.length : 
cell.values.length - 1;
         System.arraycopy(cell.values, 0, result, 0, length);
         count += length;
     }
@@ -108,7 +108,7 @@ class FullRowBuilder implements RowBuilder
     @Override
     public boolean hasRegularValueColumn()
     {
-        return !noValueColumns;
+        return hasProjectedValueColumns;
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
index c054ebc..58cb424 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -57,7 +56,6 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
     private final Stats stats;
     private final CqlTable cqlTable;
     private final Object[] values;
-    private final boolean noValueColumns;
     @Nullable
     protected final PruneColumnFilter columnFilter;
     private final long startTimeNanos;
@@ -74,6 +72,8 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
     private long previousTimeNanos;
 
     protected final int partitionId;
+    protected final int firstProjectedValueColumnPositionOrZero;
+    protected final boolean hasProjectedValueColumns;
 
     public SparkCellIterator(int partitionId,
                              @NotNull DataLayer dataLayer,
@@ -88,19 +88,16 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         if (columnFilter != null)
         {
             LOGGER.info("Adding prune column filter columns='{}'", 
String.join(",", columnFilter.requiredColumns()));
-
-            // If we are reading only partition/clustering keys or static 
columns, no value columns
-            Set<String> valueColumns = 
cqlTable.valueColumns().stream().map(CqlField::name).collect(Collectors.toSet());
-            noValueColumns = 
columnFilter.requiredColumns().stream().noneMatch(valueColumns::contains);
-        }
-        else
-        {
-            noValueColumns = cqlTable.numValueColumns() == 0;
         }
 
+        hasProjectedValueColumns = cqlTable.numValueColumns() > 0 &&
+                                   cqlTable.valueColumns()
+                                           .stream()
+                                           .anyMatch(field -> columnFilter == 
null || columnFilter.requiredColumns().contains(field.name()));
+
         // The value array copies across all the partition/clustering/static 
columns
         // and the single column value for this cell to the SparkRowIterator
-        values = new Object[cqlTable.numNonValueColumns() + (noValueColumns ? 
0 : 1)];
+        values = new Object[cqlTable.numNonValueColumns() + 
(hasProjectedValueColumns ? 1 : 0)];
 
         // Open compaction scanner
         startTimeNanos = System.nanoTime();
@@ -111,6 +108,7 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         stats.openedCompactionScanner(openTimeNanos);
         rid = scanner.rid();
         stats.openedSparkCellIterator();
+        firstProjectedValueColumnPositionOrZero = 
maybeGetPositionOfFirstProjectedValueColumnOrZero();
     }
 
     protected StreamScanner<Rid> openScanner(int partitionId,
@@ -130,9 +128,9 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
                : null;
     }
 
-    public boolean noValueColumns()
+    public boolean hasProjectedValueColumns()
     {
-        return noValueColumns;
+        return hasProjectedValueColumns;
     }
 
     @Override
@@ -196,10 +194,18 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
             String columnName = component != null ? 
ByteBufferUtils.stringThrowRuntime(component) : null;
             if (columnName == null || columnName.isEmpty())
             {
-                if (noValueColumns)
+                if (!hasProjectedValueColumns || !scanner.hasMoreColumns())
                 {
-                    // Special case where schema consists only of partition 
keys, clustering keys or static columns, no value columns
-                    next = new Cell(values, 0, newRow, rid.getTimestamp());
+                    if (hasProjectedValueColumns)
+                    {
+                        // null out the value of the cell for the case where 
we have projected value columns
+                        values[values.length - 1] = null;
+                    }
+                    // We use the position of a cell for a value column that 
is projected, or zero if no value
+                    // columns are projected. The column we find is irrelevant 
because if we fall under this
+                    // condition it means that we are in a situation where the 
row has only PK + CK, but no
+                    // regular columns.
+                    next = new Cell(values, 
firstProjectedValueColumnPositionOrZero, newRow, rid.getTimestamp());
                     return true;
                 }
 
@@ -359,4 +365,17 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         stats.fieldDeserialization(field, System.nanoTime() - now);
         return value;
     }
+
+    private int maybeGetPositionOfFirstProjectedValueColumnOrZero()
+    {
+        // find the position of the first value column that is projected
+        for (CqlField valueColumn : cqlTable.valueColumns())
+        {
+            if (columnFilter == null || 
columnFilter.includeColumn(valueColumn.name()))
+            {
+                return valueColumn.position();
+            }
+        }
+        return 0;
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
 
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
index 26e560e..d77d980 100644
--- 
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
+++ 
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
@@ -56,7 +56,7 @@ public class SparkRowIterator extends 
AbstractSparkRowIterator implements InputP
     @NotNull
     RowBuilder newBuilder()
     {
-        RowBuilder builder = new FullRowBuilder(cqlTable, noValueColumns);
+        RowBuilder builder = new FullRowBuilder(cqlTable, 
hasProjectedValueColumns);
         for (SchemaFeature feature : requestedFeatures)
         {
             builder = feature.decorate(builder);
diff --git 
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
index 118574d..c4c786b 100644
--- 
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
+++ 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
@@ -67,12 +67,12 @@ public class SparkRowIterator extends 
AbstractSparkRowIterator implements Partit
         String[] fieldNames = null;
         if (columnFilter != null)
         {
-            builder = new PartialRowBuilder(columnFilter, cqlTable, 
noValueColumns);
+            builder = new PartialRowBuilder(columnFilter, cqlTable, 
hasProjectedValueColumns);
             fieldNames = columnFilter.fieldNames();
         }
         else
         {
-            builder = new FullRowBuilder(cqlTable, noValueColumns);
+            builder = new FullRowBuilder(cqlTable, hasProjectedValueColumns);
         }
 
         for (SchemaFeature feature : requestedFeatures)
@@ -102,17 +102,17 @@ public class SparkRowIterator extends 
AbstractSparkRowIterator implements Partit
 
         PartialRowBuilder(@NotNull StructType requiredSchema,
                           CqlTable table,
-                          boolean noValueColumns)
+                          boolean hasProjectedValueColumns)
         {
-            super(table, noValueColumns);
+            super(table, hasProjectedValueColumns);
             this.requiredSchema = requiredSchema;
             Set<String> requiredColumns = 
Arrays.stream(requiredSchema.fields())
                                                 .map(StructField::name)
                                                 .collect(Collectors.toSet());
             hasAllNonValueColumns = table.fields().stream()
-                                                   
.filter(CqlField::isNonValueColumn)
-                                                   .map(CqlField::name)
-                                                   
.allMatch(requiredColumns::contains);
+                                         .filter(CqlField::isNonValueColumn)
+                                         .map(CqlField::name)
+                                         .allMatch(requiredColumns::contains);
 
             // Map original column position to new position in requiredSchema
             positionsMap = IntStream.range(0, table.numFields())
@@ -161,7 +161,7 @@ public class SparkRowIterator extends 
AbstractSparkRowIterator implements Partit
             }
 
             // Otherwise we need to only return columns requested and map to 
new position in result array
-            int length = noValueColumns ? cell.values.length : 
cell.values.length - 1;
+            int length = !hasProjectedValueColumns ? cell.values.length : 
cell.values.length - 1;
             for (int index = 0; index < length; index++)
             {
                 int position = positionsMap[index];
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
index 4dad7c3..018f56f 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
@@ -2524,4 +2524,117 @@ public class EndToEndTests
               .withColumns("Partition_Key_0", "Column_1") // PK is required 
for lookup of the inserted data
               .run();
     }
+
+    // null values in regular columns
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testSinglePartitionKeyWithNullValueColumn(CassandraBridge 
bridge)
+    {
+        Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", 
bridge.bigint())
+                                 .withColumn("c", bridge.text()))
+              .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
+              .withNullRegularColumns()
+              .run();
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testMultiplePartitionKeysWithNullValueColumn(CassandraBridge 
bridge)
+    {
+        Tester.builder(TestSchema.builder(bridge)
+                                 .withPartitionKey("a", bridge.bigint())
+                                 .withPartitionKey("b", bridge.text())
+                                 .withPartitionKey("d", bridge.aDouble())
+                                 .withColumn("c", bridge.text()))
+              .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
+              .withNullRegularColumns()
+              .run();
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void 
testSinglePartitionAndClusteringKeyWithNullValueColumn(CassandraBridge bridge)
+    {
+        qt().forAll(TestUtils.cql3Type(bridge))
+            .checkAssert(clusteringKeyType ->
+                         
Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", bridge.bigint())
+                                                  .withClusteringKey("b", 
clusteringKeyType)
+                                                  .withColumn("c", 
bridge.text()))
+                               
.withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
+                               .withNullRegularColumns()
+                               .run());
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testMultipleValueColumnsWithNullValueColumn(CassandraBridge 
bridge)
+    {
+        qt().forAll(TestUtils.cql3Type(bridge))
+            .checkAssert(clusteringKeyType ->
+                         
Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", bridge.bigint())
+                                                  .withClusteringKey("b", 
clusteringKeyType)
+                                                  .withColumn("c", 
bridge.text())
+                                                  .withColumn("d", 
bridge.aInt())
+                                                  .withColumn("e", 
bridge.ascii())
+                                                  .withColumn("f", 
bridge.blob()))
+                               
.withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
+                               .withNullRegularColumns()
+                               .run());
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testExcludeSomeColumnsWithNullValueColumn(CassandraBridge 
bridge)
+    {
+        Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", 
bridge.bigint())
+                                 .withClusteringKey("b", bridge.aInt())
+                                 .withColumn("c", bridge.text())
+                                 .withColumn("d", bridge.aInt())
+                                 .withColumn("e", bridge.ascii())
+                                 .withColumn("f", bridge.blob()))
+              .withColumns("a", "b", "d")
+              .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
+              .withNullRegularColumns()
+              .run();
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testStaticColumnWithNullValueColumn(CassandraBridge bridge)
+    {
+        Tester.builder(TestSchema.builder(bridge)
+                                 .withPartitionKey("a", bridge.uuid())
+                                 .withClusteringKey("b", bridge.bigint())
+                                 .withStaticColumn("c", bridge.aInt())
+                                 .withColumn("d", bridge.text()))
+              .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS)
+              .withNullRegularColumns()
+              .run();
+    }
+
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges")
+    public void testNullValueColumnWithPushDownFilter(CassandraBridge bridge)
+    {
+        int numRows = 10;
+        Tester.builder(TestSchema.builder(bridge).withPartitionKey("a", 
bridge.aInt()).withColumn("b", bridge.aInt()))
+              .dontWriteRandomData()
+              .withSSTableWriter(writer -> {
+                  for (int i = 0; i < numRows; i++)
+                  {
+                      writer.write(i, null);
+                  }
+              })
+              .withFilter("a=1")
+              .withCheck((ds) -> {
+                  for (Row row : ds.collectAsList())
+                  {
+                      int a = row.getInt(0);
+                      assertEquals(1, a);
+                      assertNull(row.get(1));
+                  }
+              })
+              .run();
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
index 43eb135..b14d744 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/Tester.java
@@ -91,46 +91,30 @@ public final class Tester
     private final int delayBetweenSSTablesInSecs;
     private final String statsClass;
     private final boolean upsert;
+    private final boolean nullifyValueColumn;
 
-    // CHECKSTYLE IGNORE: Constructor with many parameters
-    private Tester(@NotNull List<CassandraVersion> versions,
-                   @Nullable TestSchema.Builder schemaBuilder,
-                   @Nullable Function<String, TestSchema.Builder> 
schemaBuilderFunc,
-                   @NotNull List<Integer> numSSTables,
-                   @NotNull List<Consumer<TestSchema.TestRow>> writeListeners,
-                   @NotNull List<Consumer<TestSchema.TestRow>> readListeners,
-                   @NotNull List<Writer> writers,
-                   @NotNull List<Consumer<Dataset<Row>>> checks,
-                   @NotNull Set<String> sumFields,
-                   @Nullable Runnable reset,
-                   @Nullable String filterExpression,
-                   int numRandomRows, int expectedRowCount,
-                   boolean shouldCheckNumSSTables,
-                   @Nullable String[] columns,
-                   boolean addLastModifiedTimestamp,
-                   int delayBetweenSSTablesInSecs,
-                   @Nullable String statsClass,
-                   boolean upsert)
+    private Tester(Builder builder)
     {
-        this.versions = versions;
-        this.schemaBuilder = schemaBuilder;
-        this.schemaBuilderFunc = schemaBuilderFunc;
-        this.numSSTables = numSSTables;
-        this.writeListeners = writeListeners;
-        this.readListeners = readListeners;
-        this.writers = writers;
-        this.checks = checks;
-        this.sumFields = sumFields;
-        this.reset = reset;
-        this.filterExpression = filterExpression;
-        this.numRandomRows = numRandomRows;
-        this.expectedRowCount = expectedRowCount;
-        this.shouldCheckNumSSTables = shouldCheckNumSSTables;
-        this.columns = columns;
-        this.addLastModifiedTimestamp = addLastModifiedTimestamp;
-        this.delayBetweenSSTablesInSecs = delayBetweenSSTablesInSecs;
-        this.statsClass = statsClass;
-        this.upsert = upsert;
+        this.versions = builder.versions;
+        this.schemaBuilder = builder.schemaBuilder;
+        this.schemaBuilderFunc = builder.schemaBuilderFunc;
+        this.numSSTables = builder.numSSTables;
+        this.writeListeners = builder.writeListeners;
+        this.readListeners = builder.readListeners;
+        this.writers = builder.writers;
+        this.checks = builder.checks;
+        this.sumFields = builder.sumFields;
+        this.reset = builder.reset;
+        this.filterExpression = builder.filterExpression;
+        this.numRandomRows = builder.numRandomRows;
+        this.expectedRowCount = builder.expectedRowCount;
+        this.shouldCheckNumSSTables = builder.shouldCheckNumSSTables;
+        this.columns = builder.columns;
+        this.addLastModifiedTimestamp = builder.addLastModifiedTimestamp;
+        this.delayBetweenSSTablesInSecs = builder.delayBetweenSSTablesInSecs;
+        this.statsClass = builder.statsClass;
+        this.upsert = builder.upsert;
+        this.nullifyValueColumn = builder.nullRegularColumns;
     }
 
     static Builder builder(@NotNull TestSchema.Builder schemaBuilder)
@@ -194,6 +178,7 @@ public final class Tester
         private int delayBetweenSSTablesInSecs = 0;
         private String statsClass = null;
         private boolean upsert = false;
+        private boolean nullRegularColumns = false;
 
         private Builder(@NotNull TestSchema.Builder schemaBuilder)
         {
@@ -337,28 +322,16 @@ public final class Tester
             return this;
         }
 
+        public Builder withNullRegularColumns()
+        {
+            this.nullRegularColumns = true;
+            return this;
+        }
+
         void run()
         {
             Preconditions.checkArgument(schemaBuilder != null || 
schemaBuilderFunc != null);
-            new Tester(versions,
-                       schemaBuilder,
-                       schemaBuilderFunc,
-                       numSSTables,
-                       writeListeners,
-                       readListeners,
-                       writers,
-                       checks,
-                       sumFields,
-                       reset,
-                       filterExpression,
-                       numRandomRows,
-                       expectedRowCount,
-                       shouldCheckNumSSTables,
-                       columns,
-                       addLastModifiedTimestamp,
-                       delayBetweenSSTablesInSecs,
-                       statsClass,
-                       upsert).run();
+            new Tester(this).run();
         }
     }
 
@@ -392,34 +365,34 @@ public final class Tester
                                                     
.collect(Collectors.toMap(Function.identity(),
                                                                               
ignore -> new MutableLong()));
             Map<String, TestSchema.TestRow> rows = new 
HashMap<>(numRandomRows);
-            IntStream.range(0, numSSTables).forEach(ssTable ->
-                    schema.writeSSTable(directory, bridge, partitioner, 
upsert, writer ->
-                             IntStream.range(0, numRandomRows).forEach(row -> {
-                                 TestSchema.TestRow testRow;
-                                 do
-                                 {
-                                     testRow = schema.randomRow();
-                                 }
-                                 while (rows.containsKey(testRow.getKey()));  
// Don't write duplicate rows
-
-                                 for (Consumer<TestSchema.TestRow> 
writeListener : writeListeners)
-                                 {
-                                     writeListener.accept(testRow);
-                                 }
-
-                                 for (String sumField : sumFields)
-                                 {
-                                     sum.get(sumField).add((Number) 
testRow.get(sumField));
-                                 }
-                                 rows.put(testRow.getKey(), testRow);
-
-                                 Object[] values = testRow.allValues();
-                                 if (upsert)
-                                 {
-                                     rotate(values, 
schema.partitionKeys.size() + schema.clusteringKeys.size());
-                                 }
-                                 writer.write(values);
-                             })));
+            IntStream.range(0, numSSTables).forEach(ssTable -> 
schema.writeSSTable(directory, bridge, partitioner, upsert, writer -> {
+                IntStream.range(0, numRandomRows).forEach(row -> {
+                    TestSchema.TestRow testRow;
+                    do
+                    {
+                        testRow = schema.randomRow(nullifyValueColumn);
+                    }
+                    while (rows.containsKey(testRow.getKey()));  // Don't 
write duplicate rows
+
+                    for (Consumer<TestSchema.TestRow> writeListener : 
writeListeners)
+                    {
+                        writeListener.accept(testRow);
+                    }
+
+                    for (String sumField : sumFields)
+                    {
+                        sum.get(sumField).add((Number) testRow.get(sumField));
+                    }
+                    rows.put(testRow.getKey(), testRow);
+
+                    Object[] values = testRow.allValues();
+                    if (upsert)
+                    {
+                        rotate(values, schema.partitionKeys.size() + 
schema.clusteringKeys.size());
+                    }
+                    writer.write(values);
+                });
+            }));
             int sstableCount = numSSTables;
 
             // Write any custom SSTables e.g. overwriting existing data or 
tombstones
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
index 822a8cf..fcd3712 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
@@ -40,6 +40,12 @@ public class EmptyStreamScanner implements StreamScanner<Rid>
     {
     }
 
+    @Override
+    public boolean hasMoreColumns()
+    {
+        return false;
+    }
+
     @Override
     public void close()
     {
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
index 2a4e975..cdd0c72 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
@@ -77,4 +77,9 @@ public interface StreamScanner<Type> extends Closeable
      * @throws IOException
      */
     void advanceToNextColumn() throws IOException;
+
+    /**
+     * @return {@code true} if the scanner has more columns to consume, {@code 
false} otherwise
+     */
+    boolean hasMoreColumns();
 }
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java
index 1103153..5228c17 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java
@@ -137,6 +137,12 @@ public class IndexIterator<ReaderType extends 
IIndexReader> implements StreamSca
         return finished.get() == readers.size();
     }
 
+    @Override
+    public boolean hasMoreColumns()
+    {
+        return true;
+    }
+
     public void advanceToNextColumn()
     {
         if (closed.get())
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
index fb33da4..2e6b0a8 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
@@ -35,6 +35,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -500,16 +501,33 @@ public final class TestSchema
 
     public TestRow randomRow()
     {
-        Object[] values = new Object[allFields.size()];
-        for (CqlField field : allFields)
+        return randomRow(false);
+    }
+
+    public TestRow randomRow(boolean nullifyValueColumn)
+    {
+        return randomRow(field -> nullifyValueColumn && field.isValueColumn());
+    }
+
+    private TestRow randomRow(Predicate<CqlField> nullifiedFields)
+    {
+        final Object[] values = new Object[allFields.size()];
+        for (final CqlField field : allFields)
         {
-            if (field.type().getClass().getSimpleName().equals("Blob") && 
blobSize != null)
+            if (nullifiedFields.test(field))
             {
-                values[field.position()] = 
RandomUtils.randomByteBuffer(blobSize);
+                values[field.position()] = null;
             }
             else
             {
-                values[field.position()] = 
field.type().randomValue(minCollectionSize);
+                if (field.type().getClass().getSimpleName().equals("Blob") && 
blobSize != null)
+                {
+                    values[field.position()] = 
RandomUtils.randomByteBuffer(blobSize);
+                }
+                else
+                {
+                    values[field.position()] = 
field.type().randomValue(minCollectionSize);
+                }
             }
         }
         return new TestRow(values);
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
index 3ebe865..460ced0 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
@@ -125,6 +125,12 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<Rid>, Close
         columnData.consume();
     }
 
+    @Override
+    public boolean hasMoreColumns()
+    {
+        return columns != null && columns.hasNext();
+    }
+
     // CHECKSTYLE IGNORE: Long method
     @Override
     public boolean hasNext() throws IOException


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to