This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ac63b83  CASSANDRA-20259: Refactor to decouple RowIterator and 
CellIterator fr… (#97)
ac63b83 is described below

commit ac63b8309119a47c14818962770e48e6bbfed25e
Author: jberragan <[email protected]>
AuthorDate: Wed Jan 29 21:11:23 2025 -0800

    CASSANDRA-20259: Refactor to decouple RowIterator and CellIterator fr… (#97)
    
    Patch by James Berragan; Reviewed by  Francisco Guerrero, Yifan Cai  for 
CASSANDRA-20259
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/analytics/stats/Stats.java    |   0
 .../org/apache/cassandra/spark/reader/RowData.java |   2 +-
 .../cassandra/spark/reader/StreamScanner.java      |   2 +-
 .../org/apache/cassandra/spark/sparksql/Cell.java  |  10 +-
 .../cassandra/spark/sparksql/CellIterator.java     | 132 ++++----
 .../cassandra/spark/sparksql/FullRowBuilder.java   |  39 ++-
 .../spark/sparksql/PartialRowBuilder.java          | 136 ++++++++
 .../cassandra/spark/sparksql/RowBuilder.java       |   5 +-
 .../cassandra/spark/sparksql/RowIterator.java      | 105 +++----
 .../cassandra/spark/data/CassandraDataLayer.java   |   7 +-
 .../spark/sparksql/AbstractSparkRowIterator.java   | 121 ++------
 .../spark/sparksql/SparkCellIterator.java          | 341 ++-------------------
 .../cassandra/spark/sparksql/SparkRowIterator.java |  32 +-
 .../spark/sparksql/PartitionSizeIterator.java      |   2 +-
 .../cassandra/spark/sparksql/SparkRowIterator.java | 145 ++-------
 .../cassandra/spark/config/SchemaFeature.java      |   3 +-
 .../cassandra/spark/config/SchemaFeatureSet.java   |   5 +-
 .../sparksql/LastModifiedTimestampDecorator.java   |   9 +-
 .../spark/sparksql/RowBuilderDecorator.java        |  10 +-
 .../spark/reader/AbstractStreamScanner.java        |   4 +-
 21 files changed, 411 insertions(+), 700 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 557f828..b941a3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Refactor to decouple RowIterator and CellIterator from Spark so bulk reads 
can be performed outside of Spark (CASSANDRA-20259)
  * CEP-44 Kafka integration for Cassandra CDC using Sidecar (CASSANDRA-19962)
  * Expose detailed bulk write failure message for better insight 
(CASSANDRA-20066)
  * Add dataTransferApi and TwoPhaseImportCoordinator for coordinated write 
(CASSANDRA-19994)
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/analytics/stats/Stats.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/analytics/stats/Stats.java
similarity index 100%
rename from 
cassandra-bridge/src/main/java/org/apache/cassandra/analytics/stats/Stats.java
rename to 
cassandra-analytics-common/src/main/java/org/apache/cassandra/analytics/stats/Stats.java
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/RowData.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/RowData.java
index 4983216..b77295f 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/RowData.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/RowData.java
@@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
 
 /**
- * Rid - Row Identifier - contains the partition key, clustering keys and 
column name that uniquely identifies a row and column of data in Cassandra
+ * RowData contains the partition key, clustering keys and column name that 
uniquely identifies a row and column of data in Cassandra
  */
 public class RowData
 {
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
index 322f9f0..5f5a667 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
@@ -48,7 +48,7 @@ import java.io.IOException;
  * at which point the implementation should make a copy of the provided bytes.
  * <p>
  * Upon return from the next() call the current values of the scanner can be 
obtained by calling
- * the methods in Rid, getPartitionKey(), getColumnName(), getValue().
+ * the methods in getPartitionKey(), getColumnName(), getValue().
  *
  * @param <T> type of object returned by rid() method.
  */
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/Cell.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/Cell.java
index eb8e2d7..c22beb5 100644
--- 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/Cell.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/Cell.java
@@ -19,12 +19,12 @@
 
 package org.apache.cassandra.spark.sparksql;
 
-class Cell
+public class Cell
 {
-    final Object[] values;
-    final int position;
-    final boolean isNewRow;
-    final long timestamp;
+    public final Object[] values;
+    public final int position;
+    public final boolean isNewRow;
+    public final long timestamp;
 
     Cell(Object[] values, int position, boolean isNewRow, long timestamp)
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
similarity index 75%
copy from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
copy to 
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
index 5247550..971b545 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
@@ -20,47 +20,41 @@
 package org.apache.cassandra.spark.sparksql;
 
 import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.nio.charset.CharacterCodingException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
+import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.analytics.stats.Stats;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.data.converter.types.SparkType;
+import org.apache.cassandra.spark.data.TypeConverter;
 import org.apache.cassandra.spark.reader.RowData;
-import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
-import org.apache.cassandra.analytics.stats.Stats;
 import org.apache.cassandra.spark.utils.ByteBufferUtils;
-import org.apache.cassandra.spark.utils.FastThreadLocalUtf8Decoder;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Iterate through CompactionIterator, deserializing ByteBuffers and 
normalizing into Object[] array in column order
  */
-public class SparkCellIterator implements Iterator<Cell>, AutoCloseable
+public abstract class CellIterator implements Iterator<Cell>, AutoCloseable
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkCellIterator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CellIterator.class);
 
-    protected final DataLayer dataLayer;
     private final Stats stats;
-    private final CqlTable cqlTable;
+    protected final CqlTable cqlTable;
     private final Object[] values;
-    private final SparkType[] sparkTypes;
     @Nullable
-    protected final PruneColumnFilter columnFilter;
+    private final PruneColumnFilter columnFilter;
     private final long startTimeNanos;
     @NotNull
     private final StreamScanner<RowData> scanner;
@@ -76,19 +70,35 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
 
     protected final int partitionId;
     protected final int firstProjectedValueColumnPositionOrZero;
-    protected final boolean hasProjectedValueColumns;
-    private final SparkSqlTypeConverter sparkSqlTypeConverter;
+    private final boolean hasProjectedValueColumns;
+    protected final TypeConverter typeConverter;
+
+    public interface ScannerSupplier
+    {
+        /**
+         * @param partitionId         arbitrary id uniquely identifying this 
partiton of the bulk read
+         * @param partitionKeyFilters list of partition key filters to 
push-down,
+         * @param columnFilter        optional column filter to only read 
certain columns
+         * @return a StreamScanner to iterate over each cell of the data.g
+         */
+        StreamScanner<RowData> get(int partitionId,
+                                   @NotNull List<PartitionKeyFilter> 
partitionKeyFilters,
+                                   @Nullable PruneColumnFilter columnFilter);
+    }
 
-    public SparkCellIterator(int partitionId,
-                             @NotNull DataLayer dataLayer,
-                             @Nullable StructType requiredSchema,
-                             @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
+    public CellIterator(int partitionId,
+                        CqlTable cqlTable,
+                        Stats stats,
+                        TypeConverter typeConverter,
+                        @NotNull List<PartitionKeyFilter> partitionKeyFilters,
+                        Function<CqlTable, PruneColumnFilter> 
columnFilterSupplier,
+                        ScannerSupplier scannerSupplier)
     {
         this.partitionId = partitionId;
-        this.dataLayer = dataLayer;
-        stats = dataLayer.stats();
-        cqlTable = dataLayer.cqlTable();
-        columnFilter = buildColumnFilter(requiredSchema, cqlTable);
+        this.stats = stats;
+        this.cqlTable = cqlTable;
+        this.typeConverter = typeConverter;
+        this.columnFilter = columnFilterSupplier.apply(cqlTable);
         if (columnFilter != null)
         {
             LOGGER.info("Adding prune column filter columns='{}'", 
String.join(",", columnFilter.requiredColumns()));
@@ -106,37 +116,18 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         // Open compaction scanner
         startTimeNanos = System.nanoTime();
         previousTimeNanos = startTimeNanos;
-        scanner = openScanner(partitionId, partitionKeyFilters);
+        scanner = scannerSupplier.get(partitionId, partitionKeyFilters, 
columnFilter);
         long openTimeNanos = System.nanoTime() - startTimeNanos;
         LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos);
         stats.openedCompactionScanner(openTimeNanos);
         rowData = scanner.data();
         stats.openedSparkCellIterator();
         firstProjectedValueColumnPositionOrZero = 
maybeGetPositionOfFirstProjectedValueColumnOrZero();
-
-        sparkSqlTypeConverter = dataLayer.typeConverter();
-        sparkTypes = new SparkType[cqlTable.numFields()];
-        for (int index = 0; index < cqlTable.numFields(); index++)
-        {
-            sparkTypes[index] = 
sparkSqlTypeConverter.toSparkType(cqlTable.field(index).type());
-        }
     }
 
-    protected StreamScanner<RowData> openScanner(int partitionId,
-                                                 @NotNull 
List<PartitionKeyFilter> partitionKeyFilters)
+    public CqlTable cqlTable()
     {
-        return dataLayer.openCompactionScanner(partitionId, 
partitionKeyFilters, columnFilter);
-    }
-
-    @Nullable
-    static PruneColumnFilter buildColumnFilter(@Nullable StructType 
requiredSchema, @NotNull CqlTable cqlTable)
-    {
-        return requiredSchema != null
-               ? new PruneColumnFilter(Arrays.stream(requiredSchema.fields())
-                                             .map(StructField::name)
-                                             .filter(cqlTable::has)
-                                             .collect(Collectors.toSet()))
-               : null;
+        return cqlTable;
     }
 
     public boolean hasProjectedValueColumns()
@@ -197,12 +188,12 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
             }
 
             // Deserialize clustering keys - if moved to new CQL row - and 
update 'values' Object[] array
-            ByteBuffer columnNameBuf = 
Objects.requireNonNull(rowData.getColumnName(), "ColumnName buffer in Rid is 
null, this is unexpected");
+            ByteBuffer columnNameBuf = 
Objects.requireNonNull(rowData.getColumnName(), "ColumnName buffer in RowData 
is null, this is unexpected");
             maybeRebuildClusteringKeys(columnNameBuf);
 
             // Deserialize CQL field column name
             ByteBuffer component = 
ByteBufferUtils.extractComponent(columnNameBuf, cqlTable.numClusteringKeys());
-            String columnName = component != null ? 
FastThreadLocalUtf8Decoder.stringThrowRuntime(component) : null;
+            String columnName = decodeString(component);
             if (columnName == null || columnName.isEmpty())
             {
                 if (!hasProjectedValueColumns || !scanner.hasMoreColumns())
@@ -251,6 +242,11 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         return false;
     }
 
+    protected String decodeString(@Nullable ByteBuffer buffer) throws 
CharacterCodingException
+    {
+        return buffer == null ? null : ByteBufferUtils.string(buffer);
+    }
+
     @Override
     public void close() throws IOException
     {
@@ -286,7 +282,7 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
             values[field.position()] = null;
         }
 
-        skipPartition = !dataLayer.isInPartition(partitionId, 
rowData.getToken(), rowData.getPartitionKey());
+        skipPartition = !isInPartition(partitionId, rowData.getToken(), 
rowData.getPartitionKey());
         if (skipPartition)
         {
             stats.skippedPartitionInIterator(rowData.getPartitionKey(), 
rowData.getToken());
@@ -294,10 +290,30 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         }
 
         // Or new partition, so deserialize partition keys and update 'values' 
array
-        readPartitionKey(sparkSqlTypeConverter, rowData.getPartitionKey(), 
cqlTable, this.values, stats);
+        readPartitionKey(typeConverter, rowData.getPartitionKey(), cqlTable, 
this.values, stats);
     }
 
-    public static void readPartitionKey(SparkSqlTypeConverter 
sparkSqlTypeConverter,
+    /**
+     * @param partitionId  partition id of this bulk reader partition
+     * @param token        Cassandra token of partition key
+     * @param partitionKey raw ByteBuffer of partition key
+     * @return true if this partitionKey is within the token range of this 
bulk reader partition.
+     * Partition keys outside the token range can be ignored by this iterator.
+     */
+    public abstract boolean isInPartition(int partitionId, BigInteger token, 
ByteBuffer partitionKey);
+
+    /**
+     * Return true if two values are equal for a given CqlField, this is 
primarily used to
+     * detect if the iterator has transitioned to a new clustering key (and so 
new primary key), so should emit in a new row.
+     *
+     * @param field CqlField of the column being compared.
+     * @param obj1  first value
+     * @param obj2  second value
+     * @return true if obj1 equals obj2 for a given CqlType.
+     */
+    public abstract boolean equals(CqlField field, Object obj1, Object obj2);
+
+    public static void readPartitionKey(TypeConverter typeConverter,
                                         ByteBuffer partitionKey,
                                         CqlTable table,
                                         Object[] values,
@@ -307,7 +323,7 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         {
             // Not a composite partition key
             CqlField field = table.partitionKeys().get(0);
-            values[field.position()] = deserialize(sparkSqlTypeConverter, 
field, partitionKey, stats);
+            values[field.position()] = deserialize(typeConverter, field, 
partitionKey, stats);
         }
         else
         {
@@ -316,7 +332,7 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
             int index = 0;
             for (CqlField field : table.partitionKeys())
             {
-                values[field.position()] = deserialize(sparkSqlTypeConverter, 
field, partitionKeyBufs[index++], stats);
+                values[field.position()] = deserialize(typeConverter, field, 
partitionKeyBufs[index++], stats);
             }
         }
     }
@@ -340,7 +356,7 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
             // Historically, we compare equality of clustering keys using the 
Spark types
             // to determine if we have moved to a new 'row'. We could also 
compare using the Cassandra types
             // or the raw ByteBuffers before converting to Spark types  - this 
might be slightly more performant.
-            if (newRow || oldObj == null || newObj == null || 
!sparkTypes[field.position()].equals(newObj, oldObj))
+            if (newRow || oldObj == null || newObj == null || !equals(field, 
newObj, oldObj))
             {
                 newRow = true;
                 values[field.position()] = newObj;
@@ -370,13 +386,13 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
 
     private Object deserialize(CqlField field, ByteBuffer buffer)
     {
-        return deserialize(sparkSqlTypeConverter, field, buffer, stats);
+        return deserialize(typeConverter, field, buffer, stats);
     }
 
-    private static Object deserialize(SparkSqlTypeConverter 
sparkSqlTypeConverter, CqlField field, ByteBuffer buffer, Stats stats)
+    private static Object deserialize(TypeConverter typeConverter, CqlField 
field, ByteBuffer buffer, Stats stats)
     {
         long now = System.nanoTime();
-        Object value = buffer == null ? null : 
field.deserializeToType(sparkSqlTypeConverter, buffer);
+        Object value = buffer == null ? null : 
field.deserializeToType(typeConverter, buffer);
         stats.fieldDeserialization(field, System.nanoTime() - now);
         return value;
     }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
similarity index 81%
rename from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
rename to 
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
index 5a06343..630328c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
@@ -19,34 +19,36 @@
 
 package org.apache.cassandra.spark.sparksql;
 
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.IntStream;
+import java.util.Optional;
+import java.util.function.Function;
 
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
 /**
  * FullRowBuilder expects all fields in the schema to be returned, i.e. no 
prune column filter
+ *
+ * @param <T> type of row returned by builder
  */
-class FullRowBuilder implements RowBuilder
+public class FullRowBuilder<T> implements RowBuilder<T>
 {
     static final Object[] EMPTY_RESULT = new Object[0];
-    final int numColumns;
-    final int numCells;
-    final boolean hasProjectedValueColumns;
-    int extraColumns;
-    Object[] result;
-    int count;
+    protected final int numColumns;
+    protected final int numCells;
+    protected final boolean hasProjectedValueColumns;
+    protected int extraColumns;
+    protected Object[] result;
+    protected int count;
     private final CqlTable cqlTable;
+    protected final Function<Object[], T> rowBuilder;
 
-    FullRowBuilder(CqlTable cqlTable, boolean hasProjectedValueColumns)
+    FullRowBuilder(CqlTable cqlTable, boolean hasProjectedValueColumns, 
Function<Object[], T> rowBuilder)
     {
         this.cqlTable = cqlTable;
         this.numColumns = cqlTable.numFields();
         this.hasProjectedValueColumns = hasProjectedValueColumns;
         this.numCells = cqlTable.numNonValueColumns() + 
(hasProjectedValueColumns ? 1 : 0);
+        this.rowBuilder = rowBuilder;
     }
 
     @Override
@@ -133,16 +135,13 @@ class FullRowBuilder implements RowBuilder
     @Override
     public int fieldIndex(String name)
     {
-        List<CqlField> fields = cqlTable.fields();
-        return IntStream.range(0, fields.size())
-                        .filter(i -> Objects.equals(fields.get(i).name(), 
name))
-                        .findFirst()
-                        .orElse(-1);
+        return Optional.ofNullable(cqlTable.getField(name))
+                       .map(CqlField::position)
+                       .orElse(-1);
     }
 
-    @Override
-    public GenericInternalRow build()
+    public T build()
     {
-        return new GenericInternalRow(result);
+        return rowBuilder.apply(result);
     }
 }
diff --git 
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
new file mode 100644
index 0000000..6df672c
--- /dev/null
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * PartialRowBuilder that builds row only containing fields in requiredSchema 
prune-column filter
+ * NOTE: Spark 3 changed the contract from Spark 2 and requires us to only 
return the columns specified in
+ * the requiredSchema 'prune column' filter and not a sparse Object[] array 
with null values for excluded columns
+ *
+ * @param <T> type of row returned by builder
+ */
+public class PartialRowBuilder<T> extends FullRowBuilder<T>
+{
+    private final int[] positionsMap;
+    private final boolean hasAllNonValueColumns;
+    protected final String[] requiredSchema;
+    private final Map<String, Integer> columnIndex;
+
+    public PartialRowBuilder(@NotNull String[] requiredSchema,
+                             CqlTable table,
+                             boolean hasProjectedValueColumns,
+                             Function<Object[], T> rowBuilder)
+    {
+        super(table, hasProjectedValueColumns, rowBuilder);
+        this.requiredSchema = requiredSchema;
+        this.columnIndex = new HashMap<>(requiredSchema.length);
+        for (int i = 0; i < requiredSchema.length; i++)
+        {
+            columnIndex.put(requiredSchema[i], i);
+        }
+        Set<String> requiredColumns = 
Arrays.stream(requiredSchema).collect(Collectors.toSet());
+        hasAllNonValueColumns = table.fields().stream()
+                                     .filter(CqlField::isNonValueColumn)
+                                     .map(CqlField::name)
+                                     .allMatch(requiredColumns::contains);
+
+        // Map original column position to new position in requiredSchema
+        positionsMap = IntStream.range(0, table.numFields())
+                                .map(position -> -1)
+                                .toArray();
+        int position = 0;
+        for (String fieldName : requiredSchema)
+        {
+            CqlField field = table.getField(fieldName);
+            if (field != null)  // Field might be last modified timestamp
+            {
+                positionsMap[field.position()] = position++;
+            }
+        }
+    }
+
+    @Override
+    public void reset()
+    {
+        count = 0;
+        int totalColumns = requiredSchema.length;
+        if (totalColumns > 0)
+        {
+            result = new Object[totalColumns];
+        }
+        else
+        {
+            result = EMPTY_RESULT;
+        }
+    }
+
+    @Override
+    public int fieldIndex(String name)
+    {
+        return requiredSchema != null ? columnIndex.get(name) : 
super.fieldIndex(name);
+    }
+
+    @Override
+    public void copyKeys(Cell cell)
+    {
+        if (hasAllNonValueColumns)
+        {
+            // Optimization if we are returning all primary key/static columns 
we can use the super method
+            super.copyKeys(cell);
+            return;
+        }
+
+        // Otherwise we need to only return columns requested and map to new 
position in result array
+        int length = !hasProjectedValueColumns ? cell.values.length : 
cell.values.length - 1;
+        for (int index = 0; index < length; index++)
+        {
+            int position = positionsMap[index];
+            if (position >= 0)
+            {
+                result[position] = cell.values[index];
+            }
+        }
+        count += length;
+    }
+
+    @Override
+    public void copyValue(Cell cell)
+    {
+        // Copy the next value column mapping column to new position
+        int position = positionsMap[cell.position];
+        if (position >= 0)
+        {
+            result[position] = cell.values[cell.values.length - 1];
+        }
+        count++;
+    }
+}
diff --git 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
similarity index 91%
rename from 
cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
rename to 
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
index e6cc452..8ec14f2 100644
--- 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
@@ -20,9 +20,8 @@
 package org.apache.cassandra.spark.sparksql;
 
 import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
-public interface RowBuilder
+public interface RowBuilder<T>
 {
     CqlTable getCqlTable();
 
@@ -54,5 +53,5 @@ public interface RowBuilder
      */
     int expandRow(int extraColumns);
 
-    GenericInternalRow build();
+    T build();
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
similarity index 51%
copy from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
copy to 
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
index 1fdffd0..6d3382c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
+++ 
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
@@ -20,85 +20,73 @@
 package org.apache.cassandra.spark.sparksql;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.cassandra.spark.config.SchemaFeature;
-import org.apache.cassandra.spark.data.CqlField;
-import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import java.util.function.Function;
+
 import org.apache.cassandra.analytics.stats.Stats;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL 
rows
+ * Wrapper iterator around CellIterator to normalize cells into Spark SQL rows
+ *
+ * @param <T> type of row returned by Iterator.
  */
-abstract class AbstractSparkRowIterator
+public abstract class RowIterator<T>
 {
     private final Stats stats;
-    private final SparkCellIterator it;
+    protected final CellIterator it;
     private final long openTimeNanos;
-    private final RowBuilder builder;
+    private final RowBuilder<T> builder;
 
-    protected final List<SchemaFeature> requestedFeatures;
-    protected final CqlTable cqlTable;
-    protected final StructType columnFilter;
-    protected final boolean hasProjectedValueColumns;
+    // NOTE: requiredColumns might contain additional decorated fields like 
last_modified_timestamp,
+    // but PruneColumnFilter is pushed down to the SSTableReader so only 
contains the real table fields
+    @Nullable
+    protected final String[] requiredColumns;
 
+    protected T row;
     private Cell cell = null;
-    private InternalRow row = null;
 
-    AbstractSparkRowIterator(int partitionId,
-                             @NotNull DataLayer dataLayer,
-                             @Nullable StructType requiredSchema,
-                             @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
+    protected RowIterator(CellIterator it,
+                          Stats stats,
+                          @Nullable String[] requiredColumns,
+                          Function<RowBuilder<T>, RowBuilder<T>> decorator)
     {
-        this.stats = dataLayer.stats();
-        this.cqlTable = dataLayer.cqlTable();
-        this.columnFilter = useColumnFilter(requiredSchema, cqlTable) ? 
requiredSchema : null;
-        this.it = buildCellIterator(partitionId, dataLayer, columnFilter, 
partitionKeyFilters);
-        this.stats.openedSparkRowIterator();
+        this.stats = stats;
+        this.it = it;
         this.openTimeNanos = System.nanoTime();
-        this.requestedFeatures = dataLayer.requestedFeatures();
-        this.hasProjectedValueColumns = it.hasProjectedValueColumns();
-        this.builder = newBuilder();
-    }
-
-    protected SparkCellIterator buildCellIterator(int partitionId,
-                                                  @NotNull DataLayer dataLayer,
-                                                  @Nullable StructType 
columnFilter,
-                                                  @NotNull 
List<PartitionKeyFilter> partitionKeyFilters)
-    {
-        return new SparkCellIterator(partitionId, dataLayer, columnFilter, 
partitionKeyFilters);
+        this.requiredColumns = requiredColumns;
+        this.builder = newBuilder(decorator);
     }
 
-    private static boolean useColumnFilter(@Nullable StructType 
requiredSchema, CqlTable cqlTable)
+    @NotNull
+    protected RowBuilder<T> newBuilder(Function<RowBuilder<T>, RowBuilder<T>> 
decorator)
     {
-        if (requiredSchema == null)
+        RowBuilder<T> builder;
+        if (requiredColumns != null)
         {
-            return false;
+            builder = newPartialBuilder();
+        }
+        else
+        {
+            builder = newFullRowBuilder();
         }
-        // Only use column filter if it excludes any of the CqlTable fields
-        Set<String> requiredFields = 
Arrays.stream(requiredSchema.fields()).map(StructField::name).collect(Collectors.toSet());
-        return cqlTable.fields().stream()
-                       .map(CqlField::name)
-                       .anyMatch(field -> !requiredFields.contains(field));
-    }
 
-    abstract RowBuilder newBuilder();
+        builder = decorator.apply(builder);
 
-    public InternalRow get()
-    {
-        return row;
+        builder.reset();
+        return builder;
     }
 
+    /**
+     * @return an instance of a PartialRowBuilder that builds a row with a 
subset of columns in the schema and maps to the generic type.
+     */
+    public abstract PartialRowBuilder<T> newPartialBuilder();
+
+    /**
+     * @return an instance of a FullRowBuilder that builds a row with all 
columns in the schema and maps to the generic type.
+     */
+    public abstract FullRowBuilder<T> newFullRowBuilder();
+
     public boolean next() throws IOException
     {
         // We are finished if not already reading a row (if cell != null, it 
can happen if previous row was incomplete)
@@ -132,7 +120,7 @@ abstract class AbstractSparkRowIterator
 
             builder.onCell(cell);
 
-            if (hasProjectedValueColumns)
+            if (it.hasProjectedValueColumns())
             {
                 // If schema has value column
                 builder.copyValue(cell);
@@ -154,4 +142,9 @@ abstract class AbstractSparkRowIterator
         stats.closedSparkRowIterator(System.nanoTime() - openTimeNanos);
         it.close();
     }
+
+    public T get()
+    {
+        return row;
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 804af94..983cd8c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -62,6 +62,7 @@ import 
o.a.c.sidecar.client.shaded.common.response.ListSnapshotFilesResponse;
 import o.a.c.sidecar.client.shaded.common.response.NodeSettings;
 import o.a.c.sidecar.client.shaded.common.response.RingResponse;
 import o.a.c.sidecar.client.shaded.common.response.SchemaResponse;
+import org.apache.cassandra.analytics.stats.Stats;
 import org.apache.cassandra.bridge.BigNumberConfig;
 import org.apache.cassandra.bridge.BigNumberConfigImpl;
 import org.apache.cassandra.bridge.CassandraBridge;
@@ -86,7 +87,6 @@ import 
org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
 import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator;
 import org.apache.cassandra.spark.sparksql.RowBuilder;
-import org.apache.cassandra.analytics.stats.Stats;
 import org.apache.cassandra.spark.utils.CqlUtils;
 import org.apache.cassandra.spark.utils.ReaderTimeProvider;
 import org.apache.cassandra.spark.utils.ScalaFunctions;
@@ -96,6 +96,7 @@ import 
org.apache.cassandra.spark.validation.CassandraValidation;
 import org.apache.cassandra.spark.validation.SidecarValidation;
 import org.apache.cassandra.spark.validation.StartupValidatable;
 import org.apache.cassandra.spark.validation.StartupValidator;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.util.ShutdownHookManager;
 import org.jetbrains.annotations.NotNull;
@@ -1017,9 +1018,9 @@ public class CassandraDataLayer extends 
PartitionedDataLayer implements StartupV
             }
 
             @Override
-            public RowBuilder decorate(RowBuilder builder)
+            public <T extends InternalRow> RowBuilder<T> 
decorate(RowBuilder<T> builder)
             {
-                return new LastModifiedTimestampDecorator(builder, alias);
+                return new LastModifiedTimestampDecorator<>(builder, alias);
             }
 
             @Override
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
index 1fdffd0..85fd17b 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
@@ -19,19 +19,17 @@
 
 package org.apache.cassandra.spark.sparksql;
 
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import org.apache.cassandra.spark.config.SchemaFeature;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.DataLayer;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
-import org.apache.cassandra.analytics.stats.Stats;
-import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
@@ -39,43 +37,32 @@ import org.jetbrains.annotations.Nullable;
 
 /**
  * Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL 
rows
+ *
+ * @param <T> type of row returned by Iterator.
  */
-abstract class AbstractSparkRowIterator
+abstract class AbstractSparkRowIterator<T> extends RowIterator<T>
 {
-    private final Stats stats;
-    private final SparkCellIterator it;
-    private final long openTimeNanos;
-    private final RowBuilder builder;
-
-    protected final List<SchemaFeature> requestedFeatures;
-    protected final CqlTable cqlTable;
-    protected final StructType columnFilter;
-    protected final boolean hasProjectedValueColumns;
-
-    private Cell cell = null;
-    private InternalRow row = null;
-
     AbstractSparkRowIterator(int partitionId,
                              @NotNull DataLayer dataLayer,
                              @Nullable StructType requiredSchema,
-                             @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
+                             @NotNull List<PartitionKeyFilter> 
partitionKeyFilters,
+                             Function<RowBuilder<T>, RowBuilder<T>> decorator)
     {
-        this.stats = dataLayer.stats();
-        this.cqlTable = dataLayer.cqlTable();
-        this.columnFilter = useColumnFilter(requiredSchema, cqlTable) ? 
requiredSchema : null;
-        this.it = buildCellIterator(partitionId, dataLayer, columnFilter, 
partitionKeyFilters);
-        this.stats.openedSparkRowIterator();
-        this.openTimeNanos = System.nanoTime();
-        this.requestedFeatures = dataLayer.requestedFeatures();
-        this.hasProjectedValueColumns = it.hasProjectedValueColumns();
-        this.builder = newBuilder();
+        super(
+        buildCellIterator(partitionId, dataLayer.cqlTable(), requiredSchema, 
dataLayer, partitionKeyFilters),
+        dataLayer.stats(),
+        requiredSchema == null ? null : requiredSchema.fieldNames(),
+        decorator
+        );
     }
 
-    protected SparkCellIterator buildCellIterator(int partitionId,
-                                                  @NotNull DataLayer dataLayer,
-                                                  @Nullable StructType 
columnFilter,
-                                                  @NotNull 
List<PartitionKeyFilter> partitionKeyFilters)
+    protected static CellIterator buildCellIterator(int partitionId,
+                                                    CqlTable cqlTable,
+                                                    @Nullable StructType 
requiredSchema,
+                                                    @NotNull DataLayer 
dataLayer,
+                                                    @NotNull 
List<PartitionKeyFilter> partitionKeyFilters)
     {
+        StructType columnFilter = useColumnFilter(requiredSchema, cqlTable) ? 
requiredSchema : null;
         return new SparkCellIterator(partitionId, dataLayer, columnFilter, 
partitionKeyFilters);
     }
 
@@ -92,66 +79,24 @@ abstract class AbstractSparkRowIterator
                        .anyMatch(field -> !requiredFields.contains(field));
     }
 
-    abstract RowBuilder newBuilder();
+    /**
+     * Maps the Object[] valueArray generated by the RowIterator to the 
expected type
+     *
+     * @param valueArray
+     * @return a value of type `T`
+     */
+    public abstract T rowBuilder(Object[] valueArray);
 
-    public InternalRow get()
+    @Override
+    public PartialRowBuilder<T> newPartialBuilder()
     {
-        return row;
-    }
-
-    public boolean next() throws IOException
-    {
-        // We are finished if not already reading a row (if cell != null, it 
can happen if previous row was incomplete)
-        // and SparkCellIterator has no next value
-        if (cell == null && !it.hasNextThrows())
-        {
-            return false;
-        }
-
-        // Pivot values to normalize each cell into single SparkSQL or 'CQL' 
type row
-        do
-        {
-            if (cell == null)
-            {
-                // Read next cell
-                cell = it.next();
-            }
-
-            if (builder.isFirstCell())
-            {
-                // On first iteration, copy all partition keys, clustering 
keys, static columns
-                assert cell.isNewRow;
-                builder.copyKeys(cell);
-            }
-            else if (cell.isNewRow)
-            {
-                // Current row is incomplete, so we have moved to new row 
before reaching end
-                // break out to return current incomplete row and handle next 
row in next iteration
-                break;
-            }
-
-            builder.onCell(cell);
-
-            if (hasProjectedValueColumns)
-            {
-                // If schema has value column
-                builder.copyValue(cell);
-            }
-            cell = null;
-            // Keep reading more cells until we read the entire row
-        } while (builder.hasMoreCells() && it.hasNextThrows());
-
-        // Build row and reset builder for next row
-        row = builder.build();
-        builder.reset();
-
-        stats.nextRow();
-        return true;
+        Objects.requireNonNull(requiredColumns, "requiredColumns must be 
non-null for PartialRowBuilder");
+        return new PartialRowBuilder<>(requiredColumns, it.cqlTable(), 
it.hasProjectedValueColumns(), this::rowBuilder);
     }
 
-    public void close() throws IOException
+    @Override
+    public FullRowBuilder<T> newFullRowBuilder()
     {
-        stats.closedSparkRowIterator(System.nanoTime() - openTimeNanos);
-        it.close();
+        return new FullRowBuilder<>(it.cqlTable(), 
it.hasProjectedValueColumns(), this::rowBuilder);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
index 5247550..e5d164c 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
@@ -19,115 +19,51 @@
 
 package org.apache.cassandra.spark.sparksql;
 
-import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.data.converter.types.SparkType;
-import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter;
-import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.data.converter.types.SparkType;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
-import org.apache.cassandra.analytics.stats.Stats;
-import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.FastThreadLocalUtf8Decoder;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * Iterate through CompactionIterator, deserializing ByteBuffers and 
normalizing into Object[] array in column order
- */
-public class SparkCellIterator implements Iterator<Cell>, AutoCloseable
+public class SparkCellIterator extends CellIterator
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkCellIterator.class);
-
-    protected final DataLayer dataLayer;
-    private final Stats stats;
-    private final CqlTable cqlTable;
-    private final Object[] values;
+    private final DataLayer dataLayer;
     private final SparkType[] sparkTypes;
-    @Nullable
-    protected final PruneColumnFilter columnFilter;
-    private final long startTimeNanos;
-    @NotNull
-    private final StreamScanner<RowData> scanner;
-    @NotNull
-    private final RowData rowData;
-
-    // Mutable Iterator State
-    private boolean skipPartition = false;
-    private boolean newRow = false;
-    private boolean closed = false;
-    private Cell next = null;
-    private long previousTimeNanos;
-
-    protected final int partitionId;
-    protected final int firstProjectedValueColumnPositionOrZero;
-    protected final boolean hasProjectedValueColumns;
-    private final SparkSqlTypeConverter sparkSqlTypeConverter;
 
     public SparkCellIterator(int partitionId,
                              @NotNull DataLayer dataLayer,
                              @Nullable StructType requiredSchema,
                              @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
     {
-        this.partitionId = partitionId;
+        super(partitionId,
+              dataLayer.cqlTable(),
+              dataLayer.stats(),
+              dataLayer.typeConverter(),
+              partitionKeyFilters,
+              (cqlTable) -> buildColumnFilter(requiredSchema, cqlTable),
+              dataLayer::openCompactionScanner);
         this.dataLayer = dataLayer;
-        stats = dataLayer.stats();
-        cqlTable = dataLayer.cqlTable();
-        columnFilter = buildColumnFilter(requiredSchema, cqlTable);
-        if (columnFilter != null)
-        {
-            LOGGER.info("Adding prune column filter columns='{}'", 
String.join(",", columnFilter.requiredColumns()));
-        }
-
-        hasProjectedValueColumns = cqlTable.numValueColumns() > 0 &&
-                                   cqlTable.valueColumns()
-                                           .stream()
-                                           .anyMatch(field -> columnFilter == 
null || columnFilter.requiredColumns().contains(field.name()));
-
-        // The value array copies across all the partition/clustering/static 
columns
-        // and the single column value for this cell to the SparkRowIterator
-        values = new Object[cqlTable.numNonValueColumns() + 
(hasProjectedValueColumns ? 1 : 0)];
-
-        // Open compaction scanner
-        startTimeNanos = System.nanoTime();
-        previousTimeNanos = startTimeNanos;
-        scanner = openScanner(partitionId, partitionKeyFilters);
-        long openTimeNanos = System.nanoTime() - startTimeNanos;
-        LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos);
-        stats.openedCompactionScanner(openTimeNanos);
-        rowData = scanner.data();
-        stats.openedSparkCellIterator();
-        firstProjectedValueColumnPositionOrZero = 
maybeGetPositionOfFirstProjectedValueColumnOrZero();
-
-        sparkSqlTypeConverter = dataLayer.typeConverter();
-        sparkTypes = new SparkType[cqlTable.numFields()];
+        this.sparkTypes = new SparkType[cqlTable.numFields()];
+        SparkSqlTypeConverter sparkSqlTypeConverter = ((SparkSqlTypeConverter) 
this.typeConverter);
         for (int index = 0; index < cqlTable.numFields(); index++)
         {
-            sparkTypes[index] = 
sparkSqlTypeConverter.toSparkType(cqlTable.field(index).type());
+            this.sparkTypes[index] = 
sparkSqlTypeConverter.toSparkType(cqlTable.field(index).type());
         }
     }
 
-    protected StreamScanner<RowData> openScanner(int partitionId,
-                                                 @NotNull 
List<PartitionKeyFilter> partitionKeyFilters)
-    {
-        return dataLayer.openCompactionScanner(partitionId, 
partitionKeyFilters, columnFilter);
-    }
-
     @Nullable
     static PruneColumnFilter buildColumnFilter(@Nullable StructType 
requiredSchema, @NotNull CqlTable cqlTable)
     {
@@ -139,258 +75,21 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
                : null;
     }
 
-    public boolean hasProjectedValueColumns()
-    {
-        return hasProjectedValueColumns;
-    }
-
     @Override
-    public boolean hasNext()
+    public boolean isInPartition(int partitionId, BigInteger token, ByteBuffer 
partitionKey)
     {
-        try
-        {
-            return hasNextThrows();
-        }
-        catch (IOException exception)
-        {
-            throw new RuntimeException(exception);
-        }
-    }
-
-    public boolean hasNextThrows() throws IOException
-    {
-        if (next != null || closed)
-        {
-            return !closed;
-        }
-        return getNext();
+        return dataLayer.isInPartition(partitionId, token, partitionKey);
     }
 
     @Override
-    public Cell next()
+    public boolean equals(CqlField field, Object obj1, Object obj2)
     {
-        Cell result = next;
-        assert result != null;
-        next = null;
-        newRow = false;
-        long now = System.nanoTime();
-        stats.nextCell(now - previousTimeNanos);
-        previousTimeNanos = now;
-        return result;
-    }
-
-    private boolean getNext() throws IOException
-    {
-        while (scanner.next())
-        {
-            // If hasNext returns true, it indicates the partition keys has 
been loaded into the rid.
-            // Therefore, let's try to rebuild partition.
-            // Deserialize partition keys - if we have moved to a new 
partition - and update 'values' Object[] array.
-            maybeRebuildPartition();
-
-            scanner.advanceToNextColumn();
-
-            // Skip partition e.g. if token is outside of Spark worker token 
range
-            if (skipPartition)
-            {
-                continue;
-            }
-
-            // Deserialize clustering keys - if moved to new CQL row - and 
update 'values' Object[] array
-            ByteBuffer columnNameBuf = 
Objects.requireNonNull(rowData.getColumnName(), "ColumnName buffer in Rid is 
null, this is unexpected");
-            maybeRebuildClusteringKeys(columnNameBuf);
-
-            // Deserialize CQL field column name
-            ByteBuffer component = 
ByteBufferUtils.extractComponent(columnNameBuf, cqlTable.numClusteringKeys());
-            String columnName = component != null ? 
FastThreadLocalUtf8Decoder.stringThrowRuntime(component) : null;
-            if (columnName == null || columnName.isEmpty())
-            {
-                if (!hasProjectedValueColumns || !scanner.hasMoreColumns())
-                {
-                    if (hasProjectedValueColumns)
-                    {
-                        // null out the value of the cell for the case where 
we have projected value columns
-                        values[values.length - 1] = null;
-                    }
-                    // We use the position of a cell for a value column that 
is projected, or zero if no value
-                    // columns are projected. The column we find is irrelevant 
because if we fall under this
-                    // condition it means that we are in a situation where the 
row has only PK + CK, but no
-                    // regular columns.
-                    next = new Cell(values, 
firstProjectedValueColumnPositionOrZero, newRow, rowData.getTimestamp());
-                    return true;
-                }
-
-                continue;
-            }
-
-            CqlField field = cqlTable.getField(columnName);
-            if (field == null)
-            {
-                LOGGER.warn("Ignoring unknown column columnName='{}'", 
columnName);
-                continue;
-            }
-
-            // Deserialize value field or static column and update 'values' 
Object[] array
-            deserializeField(field);
-
-            // Static column, so continue reading entire CQL row before 
returning
-            if (field.isStaticColumn())
-            {
-                continue;
-            }
-
-            // Update next Cell
-            next = new Cell(values, field.position(), newRow, 
rowData.getTimestamp());
-
-            return true;
-        }
-
-        // Finished so close
-        next = null;
-        close();
-        return false;
+        return this.sparkTypes[field.position()].equals(obj1, obj2);
     }
 
     @Override
-    public void close() throws IOException
-    {
-        if (!closed)
-        {
-            scanner.close();
-            closed = true;
-            long runtimeNanos = System.nanoTime() - startTimeNanos;
-            LOGGER.info("Closed CompactionScanner runtimeNanos={}", 
runtimeNanos);
-            stats.closedSparkCellIterator(runtimeNanos);
-        }
-    }
-
-    /* Iterator Helpers */
-
-    /**
-     * If it is a new partition see if we can skip (e.g. if partition outside 
Spark worker token range), otherwise re-build partition keys
-     */
-    private void maybeRebuildPartition()
-    {
-        if (!rowData.isNewPartition())
-        {
-            return;
-        }
-
-        // Skip partitions not in the token range for this Spark partition
-        newRow = true;
-
-        for (CqlField field : cqlTable.staticColumns())
-        {
-            // We need to reset static columns between partitions, if a static 
column is null/not-populated
-            // in the next partition, then the previous value might be carried 
across
-            values[field.position()] = null;
-        }
-
-        skipPartition = !dataLayer.isInPartition(partitionId, 
rowData.getToken(), rowData.getPartitionKey());
-        if (skipPartition)
-        {
-            stats.skippedPartitionInIterator(rowData.getPartitionKey(), 
rowData.getToken());
-            return;
-        }
-
-        // Or new partition, so deserialize partition keys and update 'values' 
array
-        readPartitionKey(sparkSqlTypeConverter, rowData.getPartitionKey(), 
cqlTable, this.values, stats);
-    }
-
-    public static void readPartitionKey(SparkSqlTypeConverter 
sparkSqlTypeConverter,
-                                        ByteBuffer partitionKey,
-                                        CqlTable table,
-                                        Object[] values,
-                                        Stats stats)
-    {
-        if (table.numPartitionKeys() == 1)
-        {
-            // Not a composite partition key
-            CqlField field = table.partitionKeys().get(0);
-            values[field.position()] = deserialize(sparkSqlTypeConverter, 
field, partitionKey, stats);
-        }
-        else
-        {
-            // Split composite partition keys
-            ByteBuffer[] partitionKeyBufs = 
ByteBufferUtils.split(partitionKey, table.numPartitionKeys());
-            int index = 0;
-            for (CqlField field : table.partitionKeys())
-            {
-                values[field.position()] = deserialize(sparkSqlTypeConverter, 
field, partitionKeyBufs[index++], stats);
-            }
-        }
-    }
-
-    /**
-     * Deserialize clustering key components and update 'values' array if 
changed. Mark isNewRow true if we move to new CQL row.
-     */
-    private void maybeRebuildClusteringKeys(@NotNull ByteBuffer columnNameBuf)
+    protected String decodeString(@Nullable ByteBuffer buffer)
     {
-        List<CqlField> clusteringKeys = cqlTable.clusteringKeys();
-        if (clusteringKeys.isEmpty())
-        {
-            return;
-        }
-
-        int index = 0;
-        for (CqlField field : clusteringKeys)
-        {
-            Object newObj = deserialize(field, 
ByteBufferUtils.extractComponent(columnNameBuf, index++));
-            Object oldObj = values[field.position()];
-            // Historically, we compare equality of clustering keys using the 
Spark types
-            // to determine if we have moved to a new 'row'. We could also 
compare using the Cassandra types
-            // or the raw ByteBuffers before converting to Spark types  - this 
might be slightly more performant.
-            if (newRow || oldObj == null || newObj == null || 
!sparkTypes[field.position()].equals(newObj, oldObj))
-            {
-                newRow = true;
-                values[field.position()] = newObj;
-            }
-        }
-    }
-
-    /**
-     * Deserialize value field if required and update 'values' array
-     */
-    private void deserializeField(@NotNull CqlField field)
-    {
-        if (columnFilter == null || columnFilter.includeColumn(field.name()))
-        {
-            // Deserialize value
-            Object value = deserialize(field, rowData.getValue());
-
-            if (field.isStaticColumn())
-            {
-                values[field.position()] = value;
-                return;
-            }
-
-            values[values.length - 1] = value;  // Last index in array always 
stores the cell value
-        }
-    }
-
-    private Object deserialize(CqlField field, ByteBuffer buffer)
-    {
-        return deserialize(sparkSqlTypeConverter, field, buffer, stats);
-    }
-
-    private static Object deserialize(SparkSqlTypeConverter 
sparkSqlTypeConverter, CqlField field, ByteBuffer buffer, Stats stats)
-    {
-        long now = System.nanoTime();
-        Object value = buffer == null ? null : 
field.deserializeToType(sparkSqlTypeConverter, buffer);
-        stats.fieldDeserialization(field, System.nanoTime() - now);
-        return value;
-    }
-
-    private int maybeGetPositionOfFirstProjectedValueColumnOrZero()
-    {
-        // find the position of the first value column that is projected
-        for (CqlField valueColumn : cqlTable.valueColumns())
-        {
-            if (columnFilter == null || 
columnFilter.includeColumn(valueColumn.name()))
-            {
-                return valueColumn.position();
-            }
-        }
-        return 0;
+        return buffer != null ? 
FastThreadLocalUtf8Decoder.stringThrowRuntime(buffer) : null;
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
 
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
index d77d980..c870dd5 100644
--- 
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
+++ 
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.sparksql;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -28,6 +29,7 @@ import org.apache.cassandra.spark.config.SchemaFeature;
 import org.apache.cassandra.spark.data.DataLayer;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
@@ -36,7 +38,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL 
rows
  */
-public class SparkRowIterator extends AbstractSparkRowIterator implements 
InputPartitionReader<InternalRow>
+public class SparkRowIterator extends AbstractSparkRowIterator<InternalRow> 
implements InputPartitionReader<InternalRow>
 {
     @VisibleForTesting
     public SparkRowIterator(int partitionId, @NotNull DataLayer dataLayer)
@@ -49,19 +51,37 @@ public class SparkRowIterator extends 
AbstractSparkRowIterator implements InputP
                                @Nullable StructType requiredSchema,
                                @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
     {
-        super(partitionId, dataLayer, requiredSchema, partitionKeyFilters);
+        super(partitionId,
+              dataLayer,
+              requiredSchema,
+              partitionKeyFilters,
+              (builder) -> decorate(builder, dataLayer.requestedFeatures()));
     }
 
     @Override
     @NotNull
-    RowBuilder newBuilder()
+    protected RowBuilder<InternalRow> 
newBuilder(Function<RowBuilder<InternalRow>, RowBuilder<InternalRow>> decorator)
     {
-        RowBuilder builder = new FullRowBuilder(cqlTable, 
hasProjectedValueColumns);
-        for (SchemaFeature feature : requestedFeatures)
+        RowBuilder<InternalRow> builder = new FullRowBuilder<>(it.cqlTable(), 
it.hasProjectedValueColumns(), this::rowBuilder);
+        builder = decorator.apply(builder);
+        builder.reset();
+        return builder;
+    }
+
+    protected static RowBuilder<InternalRow> decorate(RowBuilder<InternalRow> 
builder,
+                                                      List<SchemaFeature> 
features)
+    {
+        for (SchemaFeature feature : features)
         {
             builder = feature.decorate(builder);
         }
-        builder.reset();
+
         return builder;
     }
+
+    @Override
+    public InternalRow rowBuilder(Object[] result)
+    {
+        return new GenericInternalRow((result));
+    }
 }
diff --git 
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
index 99a61fe..d78ed5f 100644
--- 
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
+++ 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
@@ -70,7 +70,7 @@ public class PartitionSizeIterator implements 
PartitionReader<InternalRow>
             IndexEntry entry = it.data();
             Object[] values = new Object[numPartitionKeys + 2];
 
-            SparkCellIterator.readPartitionKey(sparkSqlTypeConverter, 
entry.getPartitionKey(), cqlTable, values, stats);
+            CellIterator.readPartitionKey(sparkSqlTypeConverter, 
entry.getPartitionKey(), cqlTable, values, stats);
             values[numPartitionKeys] = entry.getUncompressed();
             values[numPartitionKeys + 1] = entry.getCompressed();
 
diff --git 
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
index c4c786b..d076b34 100644
--- 
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
+++ 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
@@ -21,21 +21,18 @@ package org.apache.cassandra.spark.sparksql;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.spark.config.SchemaFeature;
-import org.apache.cassandra.spark.data.CqlField;
-import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.DataLayer;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.connector.read.PartitionReader;
-import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -43,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL 
rows
  */
-public class SparkRowIterator extends AbstractSparkRowIterator implements 
PartitionReader<InternalRow>
+public class SparkRowIterator extends 
AbstractSparkRowIterator<GenericInternalRow> implements 
PartitionReader<InternalRow>
 {
     @VisibleForTesting
     public SparkRowIterator(int partitionId, @NotNull DataLayer dataLayer)
@@ -51,138 +48,40 @@ public class SparkRowIterator extends 
AbstractSparkRowIterator implements Partit
         this(partitionId, dataLayer, null, new ArrayList<>());
     }
 
-    protected SparkRowIterator(int partitionId,
-                               @NotNull DataLayer dataLayer,
-                               @Nullable StructType columnFilter,
-                               @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
+    public SparkRowIterator(int partitionId,
+                            @NotNull DataLayer dataLayer,
+                            @Nullable StructType requiredSchema,
+                            @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
     {
-        super(partitionId, dataLayer, columnFilter, partitionKeyFilters);
+        super(
+        partitionId,
+        dataLayer,
+        requiredSchema,
+        partitionKeyFilters,
+        (builder) -> decorate(requiredSchema, builder, 
dataLayer.requestedFeatures())
+        );
     }
 
-    @Override
-    @NotNull
-    RowBuilder newBuilder()
+    protected static RowBuilder<GenericInternalRow> decorate(@Nullable 
StructType requiredSchema,
+                                                             
RowBuilder<GenericInternalRow> builder,
+                                                             
List<SchemaFeature> features)
     {
-        RowBuilder builder;
-        String[] fieldNames = null;
-        if (columnFilter != null)
-        {
-            builder = new PartialRowBuilder(columnFilter, cqlTable, 
hasProjectedValueColumns);
-            fieldNames = columnFilter.fieldNames();
-        }
-        else
-        {
-            builder = new FullRowBuilder(cqlTable, hasProjectedValueColumns);
-        }
-
-        for (SchemaFeature feature : requestedFeatures)
+        Set<String> fieldNames = requiredSchema == null ? null : new 
HashSet<>(Arrays.asList(requiredSchema.fieldNames()));
+        for (SchemaFeature feature : features)
         {
             // Only decorate when there is no column filter or when the field 
is requested in the query,
             // otherwise we skip decoration
-            if (columnFilter == null || 
Arrays.stream(fieldNames).anyMatch(feature.fieldName()::equals))
+            if (fieldNames == null || fieldNames.contains(feature.fieldName()))
             {
                 builder = feature.decorate(builder);
             }
         }
 
-        builder.reset();
         return builder;
     }
 
-    /**
-     * PartialRowBuilder that builds row only containing fields in 
requiredSchema prune-column filter
-     * NOTE: Spark 3 changed the contract from Spark 2 and requires us to only 
return the columns specified in
-     * the requiredSchema 'prune column' filter and not a sparse Object[] 
array with null values for excluded columns
-     */
-    static class PartialRowBuilder extends FullRowBuilder
+    public GenericInternalRow rowBuilder(Object[] valueArray)
     {
-        private final int[] positionsMap;
-        private final boolean hasAllNonValueColumns;
-        private final StructType requiredSchema;
-
-        PartialRowBuilder(@NotNull StructType requiredSchema,
-                          CqlTable table,
-                          boolean hasProjectedValueColumns)
-        {
-            super(table, hasProjectedValueColumns);
-            this.requiredSchema = requiredSchema;
-            Set<String> requiredColumns = 
Arrays.stream(requiredSchema.fields())
-                                                .map(StructField::name)
-                                                .collect(Collectors.toSet());
-            hasAllNonValueColumns = table.fields().stream()
-                                         .filter(CqlField::isNonValueColumn)
-                                         .map(CqlField::name)
-                                         .allMatch(requiredColumns::contains);
-
-            // Map original column position to new position in requiredSchema
-            positionsMap = IntStream.range(0, table.numFields())
-                                    .map(position -> -1)
-                                    .toArray();
-            int position = 0;
-            for (StructField structField : requiredSchema.fields())
-            {
-                CqlField field = table.getField(structField.name());
-                if (field != null)  // Field might be last modified timestamp
-                {
-                    positionsMap[field.position()] = position++;
-                }
-            }
-        }
-
-        @Override
-        public void reset()
-        {
-            count = 0;
-            int totalColumns = requiredSchema.size();
-            if (totalColumns > 0)
-            {
-                result = new Object[totalColumns];
-            }
-            else
-            {
-                result = EMPTY_RESULT;
-            }
-        }
-
-        @Override
-        public int fieldIndex(String name)
-        {
-            return requiredSchema != null ? requiredSchema.fieldIndex(name) : 
super.fieldIndex(name);
-        }
-
-        @Override
-        public void copyKeys(Cell cell)
-        {
-            if (hasAllNonValueColumns)
-            {
-                // Optimization if we are returning all primary key/static 
columns we can use the super method
-                super.copyKeys(cell);
-                return;
-            }
-
-            // Otherwise we need to only return columns requested and map to 
new position in result array
-            int length = !hasProjectedValueColumns ? cell.values.length : 
cell.values.length - 1;
-            for (int index = 0; index < length; index++)
-            {
-                int position = positionsMap[index];
-                if (position >= 0)
-                {
-                    result[position] = cell.values[index];
-                }
-            }
-            count += length;
-        }
-
-        @Override
-        public void copyValue(Cell cell)
-        {
-            // Copy the next value column mapping column to new position
-            int position = positionsMap[cell.position];
-            if (position >= 0)
-            {
-                result[position] = cell.values[cell.values.length - 1];
-            }
-            count++;
-        }
+        return new GenericInternalRow(valueArray);
     }
 }
diff --git 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeature.java
 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeature.java
index 30b03f7..968b03d 100644
--- 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeature.java
+++ 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeature.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.sparksql.RowBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
@@ -58,7 +59,7 @@ public interface SchemaFeature
      * @param builder the row builder
      * @return a new decorated builder
      */
-    RowBuilder decorate(RowBuilder builder);
+    <T extends InternalRow> RowBuilder<T> decorate(RowBuilder<T> builder);
 
     /**
      * The option name used in the Spark options
diff --git 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeatureSet.java
 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeatureSet.java
index 2f08486..671fbdc 100644
--- 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeatureSet.java
+++ 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeatureSet.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 
 import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator;
 import org.apache.cassandra.spark.sparksql.RowBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 
@@ -43,9 +44,9 @@ public enum SchemaFeatureSet implements SchemaFeature
             }
 
             @Override
-            public RowBuilder decorate(RowBuilder builder)
+            public <T extends InternalRow> RowBuilder<T> 
decorate(RowBuilder<T> builder)
             {
-                return new LastModifiedTimestampDecorator(builder, 
fieldName());
+                return new LastModifiedTimestampDecorator<>(builder, 
fieldName());
             }
         };
 
diff --git 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/LastModifiedTimestampDecorator.java
 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/LastModifiedTimestampDecorator.java
index 5414838..fa1a279 100644
--- 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/LastModifiedTimestampDecorator.java
+++ 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/LastModifiedTimestampDecorator.java
@@ -19,17 +19,18 @@
 
 package org.apache.cassandra.spark.sparksql;
 
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.InternalRow;
 
 /**
  * Wrap a builder to append last modified timestamp
+ * @param <T> type of row returned by this builder
  */
-public class LastModifiedTimestampDecorator extends RowBuilderDecorator
+public class LastModifiedTimestampDecorator<T extends InternalRow> extends 
RowBuilderDecorator<T>
 {
     private final int lmtColumnPosition;
     private long lastModified = 0L;
 
-    public LastModifiedTimestampDecorator(RowBuilder delegate, String 
fieldName)
+    public LastModifiedTimestampDecorator(RowBuilder<T> delegate, String 
fieldName)
     {
         super(delegate);
         int width = internalExpandRow();
@@ -60,7 +61,7 @@ public class LastModifiedTimestampDecorator extends 
RowBuilderDecorator
     }
 
     @Override
-    public GenericInternalRow build()
+    public T build()
     {
         // Append last modified timestamp
         Object[] result = array();
diff --git 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilderDecorator.java
 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilderDecorator.java
index 7dfbc59..dfbc9af 100644
--- 
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilderDecorator.java
+++ 
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilderDecorator.java
@@ -20,13 +20,13 @@
 package org.apache.cassandra.spark.sparksql;
 
 import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.InternalRow;
 
-abstract class RowBuilderDecorator implements RowBuilder
+abstract class RowBuilderDecorator<T extends InternalRow> implements 
RowBuilder<T>
 {
-    protected final RowBuilder delegate;
+    protected final RowBuilder<T> delegate;
 
-    RowBuilderDecorator(RowBuilder delegate)
+    RowBuilderDecorator(RowBuilder<T> delegate)
     {
         this.delegate = delegate;
     }
@@ -116,7 +116,7 @@ abstract class RowBuilderDecorator implements RowBuilder
     }
 
     @Override
-    public GenericInternalRow build()
+    public T build()
     {
         return delegate.build();
     }
diff --git 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
index 8b3883b..db63df8 100644
--- 
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
+++ 
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
@@ -247,7 +247,7 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<RowData>, C
                         return true;
                     }
 
-                    // For non-compact tables, set up a 
ClusteringColumnDataState to emit a Rid that emulates a
+                    // For non-compact tables, set up a 
ClusteringColumnDataState to emit a RowData that emulates a
                     // pre-3.0 CQL row marker. This is necessary for backwards 
compatibility with 2.1 & 2.0 output,
                     // and also for tables with only primary key columns 
defined.
                     // An empty PKLI is the 3.0 equivalent of having no row 
marker (e.g. row modifications via
@@ -319,7 +319,7 @@ public abstract class AbstractStreamScanner implements 
StreamScanner<RowData>, C
 
     /**
      * Maps clustering values to column data, to emulate CQL row markers which 
were removed in Cassandra 3.0,
-     * but which we must still emit Rid for in order to preserve backwards 
compatibility
+     * but which we must still emit RowData for in order to preserve backwards 
compatibility
      * and to handle tables containing only primary key columns
      */
     protected final class ClusteringColumnDataState implements ColumnDataState


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to