This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new ac63b83 CASSANDRA-20259: Refactor to decouple RowIterator and
CellIterator fr… (#97)
ac63b83 is described below
commit ac63b8309119a47c14818962770e48e6bbfed25e
Author: jberragan <[email protected]>
AuthorDate: Wed Jan 29 21:11:23 2025 -0800
CASSANDRA-20259: Refactor to decouple RowIterator and CellIterator fr… (#97)
Patch by James Berragan; Reviewed by Francisco Guerrero, Yifan Cai for
CASSANDRA-20259
---
CHANGES.txt | 1 +
.../apache/cassandra/analytics/stats/Stats.java | 0
.../org/apache/cassandra/spark/reader/RowData.java | 2 +-
.../cassandra/spark/reader/StreamScanner.java | 2 +-
.../org/apache/cassandra/spark/sparksql/Cell.java | 10 +-
.../cassandra/spark/sparksql/CellIterator.java | 132 ++++----
.../cassandra/spark/sparksql/FullRowBuilder.java | 39 ++-
.../spark/sparksql/PartialRowBuilder.java | 136 ++++++++
.../cassandra/spark/sparksql/RowBuilder.java | 5 +-
.../cassandra/spark/sparksql/RowIterator.java | 105 +++----
.../cassandra/spark/data/CassandraDataLayer.java | 7 +-
.../spark/sparksql/AbstractSparkRowIterator.java | 121 ++------
.../spark/sparksql/SparkCellIterator.java | 341 ++-------------------
.../cassandra/spark/sparksql/SparkRowIterator.java | 32 +-
.../spark/sparksql/PartitionSizeIterator.java | 2 +-
.../cassandra/spark/sparksql/SparkRowIterator.java | 145 ++-------
.../cassandra/spark/config/SchemaFeature.java | 3 +-
.../cassandra/spark/config/SchemaFeatureSet.java | 5 +-
.../sparksql/LastModifiedTimestampDecorator.java | 9 +-
.../spark/sparksql/RowBuilderDecorator.java | 10 +-
.../spark/reader/AbstractStreamScanner.java | 4 +-
21 files changed, 411 insertions(+), 700 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 557f828..b941a3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Refactor to decouple RowIterator and CellIterator from Spark so bulk reads
can be performed outside of Spark (CASSANDRA-20259)
* CEP-44 Kafka integration for Cassandra CDC using Sidecar (CASSANDRA-19962)
* Expose detailed bulk write failure message for better insight
(CASSANDRA-20066)
* Add dataTransferApi and TwoPhaseImportCoordinator for coordinated write
(CASSANDRA-19994)
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/analytics/stats/Stats.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/analytics/stats/Stats.java
similarity index 100%
rename from
cassandra-bridge/src/main/java/org/apache/cassandra/analytics/stats/Stats.java
rename to
cassandra-analytics-common/src/main/java/org/apache/cassandra/analytics/stats/Stats.java
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/RowData.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/RowData.java
index 4983216..b77295f 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/RowData.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/RowData.java
@@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
/**
- * Rid - Row Identifier - contains the partition key, clustering keys and
column name that uniquely identifies a row and column of data in Cassandra
+ * RowData contains the partition key, clustering keys and column name that
uniquely identifies a row and column of data in Cassandra
*/
public class RowData
{
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
index 322f9f0..5f5a667 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
@@ -48,7 +48,7 @@ import java.io.IOException;
* at which point the implementation should make a copy of the provided bytes.
* <p>
* Upon return from the next() call the current values of the scanner can be
obtained by calling
- * the methods in Rid, getPartitionKey(), getColumnName(), getValue().
+ * the methods in getPartitionKey(), getColumnName(), getValue().
*
* @param <T> type of object returned by rid() method.
*/
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/Cell.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/Cell.java
index eb8e2d7..c22beb5 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/Cell.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/Cell.java
@@ -19,12 +19,12 @@
package org.apache.cassandra.spark.sparksql;
-class Cell
+public class Cell
{
- final Object[] values;
- final int position;
- final boolean isNewRow;
- final long timestamp;
+ public final Object[] values;
+ public final int position;
+ public final boolean isNewRow;
+ public final long timestamp;
Cell(Object[] values, int position, boolean isNewRow, long timestamp)
{
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
similarity index 75%
copy from
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
copy to
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
index 5247550..971b545 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/CellIterator.java
@@ -20,47 +20,41 @@
package org.apache.cassandra.spark.sparksql;
import java.io.IOException;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.nio.charset.CharacterCodingException;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Collectors;
+import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.data.converter.types.SparkType;
+import org.apache.cassandra.spark.data.TypeConverter;
import org.apache.cassandra.spark.reader.RowData;
-import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter;
import org.apache.cassandra.spark.reader.StreamScanner;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
-import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.spark.utils.ByteBufferUtils;
-import org.apache.cassandra.spark.utils.FastThreadLocalUtf8Decoder;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
* Iterate through CompactionIterator, deserializing ByteBuffers and
normalizing into Object[] array in column order
*/
-public class SparkCellIterator implements Iterator<Cell>, AutoCloseable
+public abstract class CellIterator implements Iterator<Cell>, AutoCloseable
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(SparkCellIterator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CellIterator.class);
- protected final DataLayer dataLayer;
private final Stats stats;
- private final CqlTable cqlTable;
+ protected final CqlTable cqlTable;
private final Object[] values;
- private final SparkType[] sparkTypes;
@Nullable
- protected final PruneColumnFilter columnFilter;
+ private final PruneColumnFilter columnFilter;
private final long startTimeNanos;
@NotNull
private final StreamScanner<RowData> scanner;
@@ -76,19 +70,35 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
protected final int partitionId;
protected final int firstProjectedValueColumnPositionOrZero;
- protected final boolean hasProjectedValueColumns;
- private final SparkSqlTypeConverter sparkSqlTypeConverter;
+ private final boolean hasProjectedValueColumns;
+ protected final TypeConverter typeConverter;
+
+ public interface ScannerSupplier
+ {
+ /**
+ * @param partitionId arbitrary id uniquely identifying this
partiton of the bulk read
+ * @param partitionKeyFilters list of partition key filters to
push-down,
+ * @param columnFilter optional column filter to only read
certain columns
+ * @return a StreamScanner to iterate over each cell of the data.g
+ */
+ StreamScanner<RowData> get(int partitionId,
+ @NotNull List<PartitionKeyFilter>
partitionKeyFilters,
+ @Nullable PruneColumnFilter columnFilter);
+ }
- public SparkCellIterator(int partitionId,
- @NotNull DataLayer dataLayer,
- @Nullable StructType requiredSchema,
- @NotNull List<PartitionKeyFilter>
partitionKeyFilters)
+ public CellIterator(int partitionId,
+ CqlTable cqlTable,
+ Stats stats,
+ TypeConverter typeConverter,
+ @NotNull List<PartitionKeyFilter> partitionKeyFilters,
+ Function<CqlTable, PruneColumnFilter>
columnFilterSupplier,
+ ScannerSupplier scannerSupplier)
{
this.partitionId = partitionId;
- this.dataLayer = dataLayer;
- stats = dataLayer.stats();
- cqlTable = dataLayer.cqlTable();
- columnFilter = buildColumnFilter(requiredSchema, cqlTable);
+ this.stats = stats;
+ this.cqlTable = cqlTable;
+ this.typeConverter = typeConverter;
+ this.columnFilter = columnFilterSupplier.apply(cqlTable);
if (columnFilter != null)
{
LOGGER.info("Adding prune column filter columns='{}'",
String.join(",", columnFilter.requiredColumns()));
@@ -106,37 +116,18 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
// Open compaction scanner
startTimeNanos = System.nanoTime();
previousTimeNanos = startTimeNanos;
- scanner = openScanner(partitionId, partitionKeyFilters);
+ scanner = scannerSupplier.get(partitionId, partitionKeyFilters,
columnFilter);
long openTimeNanos = System.nanoTime() - startTimeNanos;
LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos);
stats.openedCompactionScanner(openTimeNanos);
rowData = scanner.data();
stats.openedSparkCellIterator();
firstProjectedValueColumnPositionOrZero =
maybeGetPositionOfFirstProjectedValueColumnOrZero();
-
- sparkSqlTypeConverter = dataLayer.typeConverter();
- sparkTypes = new SparkType[cqlTable.numFields()];
- for (int index = 0; index < cqlTable.numFields(); index++)
- {
- sparkTypes[index] =
sparkSqlTypeConverter.toSparkType(cqlTable.field(index).type());
- }
}
- protected StreamScanner<RowData> openScanner(int partitionId,
- @NotNull
List<PartitionKeyFilter> partitionKeyFilters)
+ public CqlTable cqlTable()
{
- return dataLayer.openCompactionScanner(partitionId,
partitionKeyFilters, columnFilter);
- }
-
- @Nullable
- static PruneColumnFilter buildColumnFilter(@Nullable StructType
requiredSchema, @NotNull CqlTable cqlTable)
- {
- return requiredSchema != null
- ? new PruneColumnFilter(Arrays.stream(requiredSchema.fields())
- .map(StructField::name)
- .filter(cqlTable::has)
- .collect(Collectors.toSet()))
- : null;
+ return cqlTable;
}
public boolean hasProjectedValueColumns()
@@ -197,12 +188,12 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
}
// Deserialize clustering keys - if moved to new CQL row - and
update 'values' Object[] array
- ByteBuffer columnNameBuf =
Objects.requireNonNull(rowData.getColumnName(), "ColumnName buffer in Rid is
null, this is unexpected");
+ ByteBuffer columnNameBuf =
Objects.requireNonNull(rowData.getColumnName(), "ColumnName buffer in RowData
is null, this is unexpected");
maybeRebuildClusteringKeys(columnNameBuf);
// Deserialize CQL field column name
ByteBuffer component =
ByteBufferUtils.extractComponent(columnNameBuf, cqlTable.numClusteringKeys());
- String columnName = component != null ?
FastThreadLocalUtf8Decoder.stringThrowRuntime(component) : null;
+ String columnName = decodeString(component);
if (columnName == null || columnName.isEmpty())
{
if (!hasProjectedValueColumns || !scanner.hasMoreColumns())
@@ -251,6 +242,11 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
return false;
}
+ protected String decodeString(@Nullable ByteBuffer buffer) throws
CharacterCodingException
+ {
+ return buffer == null ? null : ByteBufferUtils.string(buffer);
+ }
+
@Override
public void close() throws IOException
{
@@ -286,7 +282,7 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
values[field.position()] = null;
}
- skipPartition = !dataLayer.isInPartition(partitionId,
rowData.getToken(), rowData.getPartitionKey());
+ skipPartition = !isInPartition(partitionId, rowData.getToken(),
rowData.getPartitionKey());
if (skipPartition)
{
stats.skippedPartitionInIterator(rowData.getPartitionKey(),
rowData.getToken());
@@ -294,10 +290,30 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
}
// Or new partition, so deserialize partition keys and update 'values'
array
- readPartitionKey(sparkSqlTypeConverter, rowData.getPartitionKey(),
cqlTable, this.values, stats);
+ readPartitionKey(typeConverter, rowData.getPartitionKey(), cqlTable,
this.values, stats);
}
- public static void readPartitionKey(SparkSqlTypeConverter
sparkSqlTypeConverter,
+ /**
+ * @param partitionId partition id of this bulk reader partition
+ * @param token Cassandra token of partition key
+ * @param partitionKey raw ByteBuffer of partition key
+ * @return true if this partitionKey is within the token range of this
bulk reader partition.
+ * Partition keys outside the token range can be ignored by this iterator.
+ */
+ public abstract boolean isInPartition(int partitionId, BigInteger token,
ByteBuffer partitionKey);
+
+ /**
+ * Return true if two values are equal for a given CqlField, this is
primarily used to
+ * detect if the iterator has transitioned to a new clustering key (and so
new primary key), so should emit in a new row.
+ *
+ * @param field CqlField of the column being compared.
+ * @param obj1 first value
+ * @param obj2 second value
+ * @return true if obj1 equals obj2 for a given CqlType.
+ */
+ public abstract boolean equals(CqlField field, Object obj1, Object obj2);
+
+ public static void readPartitionKey(TypeConverter typeConverter,
ByteBuffer partitionKey,
CqlTable table,
Object[] values,
@@ -307,7 +323,7 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
{
// Not a composite partition key
CqlField field = table.partitionKeys().get(0);
- values[field.position()] = deserialize(sparkSqlTypeConverter,
field, partitionKey, stats);
+ values[field.position()] = deserialize(typeConverter, field,
partitionKey, stats);
}
else
{
@@ -316,7 +332,7 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
int index = 0;
for (CqlField field : table.partitionKeys())
{
- values[field.position()] = deserialize(sparkSqlTypeConverter,
field, partitionKeyBufs[index++], stats);
+ values[field.position()] = deserialize(typeConverter, field,
partitionKeyBufs[index++], stats);
}
}
}
@@ -340,7 +356,7 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
// Historically, we compare equality of clustering keys using the
Spark types
// to determine if we have moved to a new 'row'. We could also
compare using the Cassandra types
// or the raw ByteBuffers before converting to Spark types - this
might be slightly more performant.
- if (newRow || oldObj == null || newObj == null ||
!sparkTypes[field.position()].equals(newObj, oldObj))
+ if (newRow || oldObj == null || newObj == null || !equals(field,
newObj, oldObj))
{
newRow = true;
values[field.position()] = newObj;
@@ -370,13 +386,13 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
private Object deserialize(CqlField field, ByteBuffer buffer)
{
- return deserialize(sparkSqlTypeConverter, field, buffer, stats);
+ return deserialize(typeConverter, field, buffer, stats);
}
- private static Object deserialize(SparkSqlTypeConverter
sparkSqlTypeConverter, CqlField field, ByteBuffer buffer, Stats stats)
+ private static Object deserialize(TypeConverter typeConverter, CqlField
field, ByteBuffer buffer, Stats stats)
{
long now = System.nanoTime();
- Object value = buffer == null ? null :
field.deserializeToType(sparkSqlTypeConverter, buffer);
+ Object value = buffer == null ? null :
field.deserializeToType(typeConverter, buffer);
stats.fieldDeserialization(field, System.nanoTime() - now);
return value;
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
similarity index 81%
rename from
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
rename to
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
index 5a06343..630328c 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/FullRowBuilder.java
@@ -19,34 +19,36 @@
package org.apache.cassandra.spark.sparksql;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.IntStream;
+import java.util.Optional;
+import java.util.function.Function;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
/**
* FullRowBuilder expects all fields in the schema to be returned, i.e. no
prune column filter
+ *
+ * @param <T> type of row returned by builder
*/
-class FullRowBuilder implements RowBuilder
+public class FullRowBuilder<T> implements RowBuilder<T>
{
static final Object[] EMPTY_RESULT = new Object[0];
- final int numColumns;
- final int numCells;
- final boolean hasProjectedValueColumns;
- int extraColumns;
- Object[] result;
- int count;
+ protected final int numColumns;
+ protected final int numCells;
+ protected final boolean hasProjectedValueColumns;
+ protected int extraColumns;
+ protected Object[] result;
+ protected int count;
private final CqlTable cqlTable;
+ protected final Function<Object[], T> rowBuilder;
- FullRowBuilder(CqlTable cqlTable, boolean hasProjectedValueColumns)
+ FullRowBuilder(CqlTable cqlTable, boolean hasProjectedValueColumns,
Function<Object[], T> rowBuilder)
{
this.cqlTable = cqlTable;
this.numColumns = cqlTable.numFields();
this.hasProjectedValueColumns = hasProjectedValueColumns;
this.numCells = cqlTable.numNonValueColumns() +
(hasProjectedValueColumns ? 1 : 0);
+ this.rowBuilder = rowBuilder;
}
@Override
@@ -133,16 +135,13 @@ class FullRowBuilder implements RowBuilder
@Override
public int fieldIndex(String name)
{
- List<CqlField> fields = cqlTable.fields();
- return IntStream.range(0, fields.size())
- .filter(i -> Objects.equals(fields.get(i).name(),
name))
- .findFirst()
- .orElse(-1);
+ return Optional.ofNullable(cqlTable.getField(name))
+ .map(CqlField::position)
+ .orElse(-1);
}
- @Override
- public GenericInternalRow build()
+ public T build()
{
- return new GenericInternalRow(result);
+ return rowBuilder.apply(result);
}
}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
new file mode 100644
index 0000000..6df672c
--- /dev/null
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/PartialRowBuilder.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.cassandra.spark.data.CqlField;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * PartialRowBuilder that builds row only containing fields in requiredSchema
prune-column filter
+ * NOTE: Spark 3 changed the contract from Spark 2 and requires us to only
return the columns specified in
+ * the requiredSchema 'prune column' filter and not a sparse Object[] array
with null values for excluded columns
+ *
+ * @param <T> type of row returned by builder
+ */
+public class PartialRowBuilder<T> extends FullRowBuilder<T>
+{
+ private final int[] positionsMap;
+ private final boolean hasAllNonValueColumns;
+ protected final String[] requiredSchema;
+ private final Map<String, Integer> columnIndex;
+
+ public PartialRowBuilder(@NotNull String[] requiredSchema,
+ CqlTable table,
+ boolean hasProjectedValueColumns,
+ Function<Object[], T> rowBuilder)
+ {
+ super(table, hasProjectedValueColumns, rowBuilder);
+ this.requiredSchema = requiredSchema;
+ this.columnIndex = new HashMap<>(requiredSchema.length);
+ for (int i = 0; i < requiredSchema.length; i++)
+ {
+ columnIndex.put(requiredSchema[i], i);
+ }
+ Set<String> requiredColumns =
Arrays.stream(requiredSchema).collect(Collectors.toSet());
+ hasAllNonValueColumns = table.fields().stream()
+ .filter(CqlField::isNonValueColumn)
+ .map(CqlField::name)
+ .allMatch(requiredColumns::contains);
+
+ // Map original column position to new position in requiredSchema
+ positionsMap = IntStream.range(0, table.numFields())
+ .map(position -> -1)
+ .toArray();
+ int position = 0;
+ for (String fieldName : requiredSchema)
+ {
+ CqlField field = table.getField(fieldName);
+ if (field != null) // Field might be last modified timestamp
+ {
+ positionsMap[field.position()] = position++;
+ }
+ }
+ }
+
+ @Override
+ public void reset()
+ {
+ count = 0;
+ int totalColumns = requiredSchema.length;
+ if (totalColumns > 0)
+ {
+ result = new Object[totalColumns];
+ }
+ else
+ {
+ result = EMPTY_RESULT;
+ }
+ }
+
+ @Override
+ public int fieldIndex(String name)
+ {
+ return requiredSchema != null ? columnIndex.get(name) :
super.fieldIndex(name);
+ }
+
+ @Override
+ public void copyKeys(Cell cell)
+ {
+ if (hasAllNonValueColumns)
+ {
+ // Optimization if we are returning all primary key/static columns
we can use the super method
+ super.copyKeys(cell);
+ return;
+ }
+
+ // Otherwise we need to only return columns requested and map to new
position in result array
+ int length = !hasProjectedValueColumns ? cell.values.length :
cell.values.length - 1;
+ for (int index = 0; index < length; index++)
+ {
+ int position = positionsMap[index];
+ if (position >= 0)
+ {
+ result[position] = cell.values[index];
+ }
+ }
+ count += length;
+ }
+
+ @Override
+ public void copyValue(Cell cell)
+ {
+ // Copy the next value column mapping column to new position
+ int position = positionsMap[cell.position];
+ if (position >= 0)
+ {
+ result[position] = cell.values[cell.values.length - 1];
+ }
+ count++;
+ }
+}
diff --git
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
similarity index 91%
rename from
cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
rename to
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
index e6cc452..8ec14f2 100644
---
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilder.java
@@ -20,9 +20,8 @@
package org.apache.cassandra.spark.sparksql;
import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-public interface RowBuilder
+public interface RowBuilder<T>
{
CqlTable getCqlTable();
@@ -54,5 +53,5 @@ public interface RowBuilder
*/
int expandRow(int extraColumns);
- GenericInternalRow build();
+ T build();
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
similarity index 51%
copy from
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
copy to
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
index 1fdffd0..6d3382c 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/sparksql/RowIterator.java
@@ -20,85 +20,73 @@
package org.apache.cassandra.spark.sparksql;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.cassandra.spark.config.SchemaFeature;
-import org.apache.cassandra.spark.data.CqlField;
-import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
+import java.util.function.Function;
+
import org.apache.cassandra.analytics.stats.Stats;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL
rows
+ * Wrapper iterator around CellIterator to normalize cells into Spark SQL rows
+ *
+ * @param <T> type of row returned by Iterator.
*/
-abstract class AbstractSparkRowIterator
+public abstract class RowIterator<T>
{
private final Stats stats;
- private final SparkCellIterator it;
+ protected final CellIterator it;
private final long openTimeNanos;
- private final RowBuilder builder;
+ private final RowBuilder<T> builder;
- protected final List<SchemaFeature> requestedFeatures;
- protected final CqlTable cqlTable;
- protected final StructType columnFilter;
- protected final boolean hasProjectedValueColumns;
+ // NOTE: requiredColumns might contain additional decorated fields like
last_modified_timestamp,
+ // but PruneColumnFilter is pushed down to the SSTableReader so only
contains the real table fields
+ @Nullable
+ protected final String[] requiredColumns;
+ protected T row;
private Cell cell = null;
- private InternalRow row = null;
- AbstractSparkRowIterator(int partitionId,
- @NotNull DataLayer dataLayer,
- @Nullable StructType requiredSchema,
- @NotNull List<PartitionKeyFilter>
partitionKeyFilters)
+ protected RowIterator(CellIterator it,
+ Stats stats,
+ @Nullable String[] requiredColumns,
+ Function<RowBuilder<T>, RowBuilder<T>> decorator)
{
- this.stats = dataLayer.stats();
- this.cqlTable = dataLayer.cqlTable();
- this.columnFilter = useColumnFilter(requiredSchema, cqlTable) ?
requiredSchema : null;
- this.it = buildCellIterator(partitionId, dataLayer, columnFilter,
partitionKeyFilters);
- this.stats.openedSparkRowIterator();
+ this.stats = stats;
+ this.it = it;
this.openTimeNanos = System.nanoTime();
- this.requestedFeatures = dataLayer.requestedFeatures();
- this.hasProjectedValueColumns = it.hasProjectedValueColumns();
- this.builder = newBuilder();
- }
-
- protected SparkCellIterator buildCellIterator(int partitionId,
- @NotNull DataLayer dataLayer,
- @Nullable StructType
columnFilter,
- @NotNull
List<PartitionKeyFilter> partitionKeyFilters)
- {
- return new SparkCellIterator(partitionId, dataLayer, columnFilter,
partitionKeyFilters);
+ this.requiredColumns = requiredColumns;
+ this.builder = newBuilder(decorator);
}
- private static boolean useColumnFilter(@Nullable StructType
requiredSchema, CqlTable cqlTable)
+ @NotNull
+ protected RowBuilder<T> newBuilder(Function<RowBuilder<T>, RowBuilder<T>>
decorator)
{
- if (requiredSchema == null)
+ RowBuilder<T> builder;
+ if (requiredColumns != null)
{
- return false;
+ builder = newPartialBuilder();
+ }
+ else
+ {
+ builder = newFullRowBuilder();
}
- // Only use column filter if it excludes any of the CqlTable fields
- Set<String> requiredFields =
Arrays.stream(requiredSchema.fields()).map(StructField::name).collect(Collectors.toSet());
- return cqlTable.fields().stream()
- .map(CqlField::name)
- .anyMatch(field -> !requiredFields.contains(field));
- }
- abstract RowBuilder newBuilder();
+ builder = decorator.apply(builder);
- public InternalRow get()
- {
- return row;
+ builder.reset();
+ return builder;
}
+ /**
+ * @return an instance of a PartialRowBuilder that builds a row with a
subset of columns in the schema and maps to the generic type.
+ */
+ public abstract PartialRowBuilder<T> newPartialBuilder();
+
+ /**
+ * @return an instance of a FullRowBuilder that builds a row with all
columns in the schema and maps to the generic type.
+ */
+ public abstract FullRowBuilder<T> newFullRowBuilder();
+
public boolean next() throws IOException
{
// We are finished if not already reading a row (if cell != null, it
can happen if previous row was incomplete)
@@ -132,7 +120,7 @@ abstract class AbstractSparkRowIterator
builder.onCell(cell);
- if (hasProjectedValueColumns)
+ if (it.hasProjectedValueColumns())
{
// If schema has value column
builder.copyValue(cell);
@@ -154,4 +142,9 @@ abstract class AbstractSparkRowIterator
stats.closedSparkRowIterator(System.nanoTime() - openTimeNanos);
it.close();
}
+
+ public T get()
+ {
+ return row;
+ }
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
index 804af94..983cd8c 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java
@@ -62,6 +62,7 @@ import
o.a.c.sidecar.client.shaded.common.response.ListSnapshotFilesResponse;
import o.a.c.sidecar.client.shaded.common.response.NodeSettings;
import o.a.c.sidecar.client.shaded.common.response.RingResponse;
import o.a.c.sidecar.client.shaded.common.response.SchemaResponse;
+import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.bridge.BigNumberConfig;
import org.apache.cassandra.bridge.BigNumberConfigImpl;
import org.apache.cassandra.bridge.CassandraBridge;
@@ -86,7 +87,6 @@ import
org.apache.cassandra.spark.data.partitioner.Partitioner;
import org.apache.cassandra.spark.data.partitioner.TokenPartitioner;
import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator;
import org.apache.cassandra.spark.sparksql.RowBuilder;
-import org.apache.cassandra.analytics.stats.Stats;
import org.apache.cassandra.spark.utils.CqlUtils;
import org.apache.cassandra.spark.utils.ReaderTimeProvider;
import org.apache.cassandra.spark.utils.ScalaFunctions;
@@ -96,6 +96,7 @@ import
org.apache.cassandra.spark.validation.CassandraValidation;
import org.apache.cassandra.spark.validation.SidecarValidation;
import org.apache.cassandra.spark.validation.StartupValidatable;
import org.apache.cassandra.spark.validation.StartupValidator;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.util.ShutdownHookManager;
import org.jetbrains.annotations.NotNull;
@@ -1017,9 +1018,9 @@ public class CassandraDataLayer extends
PartitionedDataLayer implements StartupV
}
@Override
- public RowBuilder decorate(RowBuilder builder)
+ public <T extends InternalRow> RowBuilder<T>
decorate(RowBuilder<T> builder)
{
- return new LastModifiedTimestampDecorator(builder, alias);
+ return new LastModifiedTimestampDecorator<>(builder, alias);
}
@Override
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
index 1fdffd0..85fd17b 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/AbstractSparkRowIterator.java
@@ -19,19 +19,17 @@
package org.apache.cassandra.spark.sparksql;
-import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
-import org.apache.cassandra.spark.config.SchemaFeature;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.DataLayer;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
-import org.apache.cassandra.analytics.stats.Stats;
-import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
@@ -39,43 +37,32 @@ import org.jetbrains.annotations.Nullable;
/**
* Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL
rows
+ *
+ * @param <T> type of row returned by Iterator.
*/
-abstract class AbstractSparkRowIterator
+abstract class AbstractSparkRowIterator<T> extends RowIterator<T>
{
- private final Stats stats;
- private final SparkCellIterator it;
- private final long openTimeNanos;
- private final RowBuilder builder;
-
- protected final List<SchemaFeature> requestedFeatures;
- protected final CqlTable cqlTable;
- protected final StructType columnFilter;
- protected final boolean hasProjectedValueColumns;
-
- private Cell cell = null;
- private InternalRow row = null;
-
AbstractSparkRowIterator(int partitionId,
@NotNull DataLayer dataLayer,
@Nullable StructType requiredSchema,
- @NotNull List<PartitionKeyFilter>
partitionKeyFilters)
+ @NotNull List<PartitionKeyFilter>
partitionKeyFilters,
+ Function<RowBuilder<T>, RowBuilder<T>> decorator)
{
- this.stats = dataLayer.stats();
- this.cqlTable = dataLayer.cqlTable();
- this.columnFilter = useColumnFilter(requiredSchema, cqlTable) ?
requiredSchema : null;
- this.it = buildCellIterator(partitionId, dataLayer, columnFilter,
partitionKeyFilters);
- this.stats.openedSparkRowIterator();
- this.openTimeNanos = System.nanoTime();
- this.requestedFeatures = dataLayer.requestedFeatures();
- this.hasProjectedValueColumns = it.hasProjectedValueColumns();
- this.builder = newBuilder();
+ super(
+ buildCellIterator(partitionId, dataLayer.cqlTable(), requiredSchema,
dataLayer, partitionKeyFilters),
+ dataLayer.stats(),
+ requiredSchema == null ? null : requiredSchema.fieldNames(),
+ decorator
+ );
}
- protected SparkCellIterator buildCellIterator(int partitionId,
- @NotNull DataLayer dataLayer,
- @Nullable StructType
columnFilter,
- @NotNull
List<PartitionKeyFilter> partitionKeyFilters)
+ protected static CellIterator buildCellIterator(int partitionId,
+ CqlTable cqlTable,
+ @Nullable StructType
requiredSchema,
+ @NotNull DataLayer
dataLayer,
+ @NotNull
List<PartitionKeyFilter> partitionKeyFilters)
{
+ StructType columnFilter = useColumnFilter(requiredSchema, cqlTable) ?
requiredSchema : null;
return new SparkCellIterator(partitionId, dataLayer, columnFilter,
partitionKeyFilters);
}
@@ -92,66 +79,24 @@ abstract class AbstractSparkRowIterator
.anyMatch(field -> !requiredFields.contains(field));
}
- abstract RowBuilder newBuilder();
+ /**
+ * Maps the Object[] valueArray generated by the RowIterator to the
expected type
+ *
+ * @param valueArray
+ * @return a value of type `T`
+ */
+ public abstract T rowBuilder(Object[] valueArray);
- public InternalRow get()
+ @Override
+ public PartialRowBuilder<T> newPartialBuilder()
{
- return row;
- }
-
- public boolean next() throws IOException
- {
- // We are finished if not already reading a row (if cell != null, it
can happen if previous row was incomplete)
- // and SparkCellIterator has no next value
- if (cell == null && !it.hasNextThrows())
- {
- return false;
- }
-
- // Pivot values to normalize each cell into single SparkSQL or 'CQL'
type row
- do
- {
- if (cell == null)
- {
- // Read next cell
- cell = it.next();
- }
-
- if (builder.isFirstCell())
- {
- // On first iteration, copy all partition keys, clustering
keys, static columns
- assert cell.isNewRow;
- builder.copyKeys(cell);
- }
- else if (cell.isNewRow)
- {
- // Current row is incomplete, so we have moved to new row
before reaching end
- // break out to return current incomplete row and handle next
row in next iteration
- break;
- }
-
- builder.onCell(cell);
-
- if (hasProjectedValueColumns)
- {
- // If schema has value column
- builder.copyValue(cell);
- }
- cell = null;
- // Keep reading more cells until we read the entire row
- } while (builder.hasMoreCells() && it.hasNextThrows());
-
- // Build row and reset builder for next row
- row = builder.build();
- builder.reset();
-
- stats.nextRow();
- return true;
+ Objects.requireNonNull(requiredColumns, "requiredColumns must be
non-null for PartialRowBuilder");
+ return new PartialRowBuilder<>(requiredColumns, it.cqlTable(),
it.hasProjectedValueColumns(), this::rowBuilder);
}
- public void close() throws IOException
+ @Override
+ public FullRowBuilder<T> newFullRowBuilder()
{
- stats.closedSparkRowIterator(System.nanoTime() - openTimeNanos);
- it.close();
+ return new FullRowBuilder<>(it.cqlTable(),
it.hasProjectedValueColumns(), this::rowBuilder);
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
index 5247550..e5d164c 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
@@ -19,115 +19,51 @@
package org.apache.cassandra.spark.sparksql;
-import java.io.IOException;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
-import java.util.Objects;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.data.converter.types.SparkType;
-import org.apache.cassandra.spark.reader.RowData;
import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter;
-import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.data.converter.types.SparkType;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
-import org.apache.cassandra.analytics.stats.Stats;
-import org.apache.cassandra.spark.utils.ByteBufferUtils;
import org.apache.cassandra.spark.utils.FastThreadLocalUtf8Decoder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-/**
- * Iterate through CompactionIterator, deserializing ByteBuffers and
normalizing into Object[] array in column order
- */
-public class SparkCellIterator implements Iterator<Cell>, AutoCloseable
+public class SparkCellIterator extends CellIterator
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(SparkCellIterator.class);
-
- protected final DataLayer dataLayer;
- private final Stats stats;
- private final CqlTable cqlTable;
- private final Object[] values;
+ private final DataLayer dataLayer;
private final SparkType[] sparkTypes;
- @Nullable
- protected final PruneColumnFilter columnFilter;
- private final long startTimeNanos;
- @NotNull
- private final StreamScanner<RowData> scanner;
- @NotNull
- private final RowData rowData;
-
- // Mutable Iterator State
- private boolean skipPartition = false;
- private boolean newRow = false;
- private boolean closed = false;
- private Cell next = null;
- private long previousTimeNanos;
-
- protected final int partitionId;
- protected final int firstProjectedValueColumnPositionOrZero;
- protected final boolean hasProjectedValueColumns;
- private final SparkSqlTypeConverter sparkSqlTypeConverter;
public SparkCellIterator(int partitionId,
@NotNull DataLayer dataLayer,
@Nullable StructType requiredSchema,
@NotNull List<PartitionKeyFilter>
partitionKeyFilters)
{
- this.partitionId = partitionId;
+ super(partitionId,
+ dataLayer.cqlTable(),
+ dataLayer.stats(),
+ dataLayer.typeConverter(),
+ partitionKeyFilters,
+ (cqlTable) -> buildColumnFilter(requiredSchema, cqlTable),
+ dataLayer::openCompactionScanner);
this.dataLayer = dataLayer;
- stats = dataLayer.stats();
- cqlTable = dataLayer.cqlTable();
- columnFilter = buildColumnFilter(requiredSchema, cqlTable);
- if (columnFilter != null)
- {
- LOGGER.info("Adding prune column filter columns='{}'",
String.join(",", columnFilter.requiredColumns()));
- }
-
- hasProjectedValueColumns = cqlTable.numValueColumns() > 0 &&
- cqlTable.valueColumns()
- .stream()
- .anyMatch(field -> columnFilter ==
null || columnFilter.requiredColumns().contains(field.name()));
-
- // The value array copies across all the partition/clustering/static
columns
- // and the single column value for this cell to the SparkRowIterator
- values = new Object[cqlTable.numNonValueColumns() +
(hasProjectedValueColumns ? 1 : 0)];
-
- // Open compaction scanner
- startTimeNanos = System.nanoTime();
- previousTimeNanos = startTimeNanos;
- scanner = openScanner(partitionId, partitionKeyFilters);
- long openTimeNanos = System.nanoTime() - startTimeNanos;
- LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos);
- stats.openedCompactionScanner(openTimeNanos);
- rowData = scanner.data();
- stats.openedSparkCellIterator();
- firstProjectedValueColumnPositionOrZero =
maybeGetPositionOfFirstProjectedValueColumnOrZero();
-
- sparkSqlTypeConverter = dataLayer.typeConverter();
- sparkTypes = new SparkType[cqlTable.numFields()];
+ this.sparkTypes = new SparkType[cqlTable.numFields()];
+ SparkSqlTypeConverter sparkSqlTypeConverter = ((SparkSqlTypeConverter)
this.typeConverter);
for (int index = 0; index < cqlTable.numFields(); index++)
{
- sparkTypes[index] =
sparkSqlTypeConverter.toSparkType(cqlTable.field(index).type());
+ this.sparkTypes[index] =
sparkSqlTypeConverter.toSparkType(cqlTable.field(index).type());
}
}
- protected StreamScanner<RowData> openScanner(int partitionId,
- @NotNull
List<PartitionKeyFilter> partitionKeyFilters)
- {
- return dataLayer.openCompactionScanner(partitionId,
partitionKeyFilters, columnFilter);
- }
-
@Nullable
static PruneColumnFilter buildColumnFilter(@Nullable StructType
requiredSchema, @NotNull CqlTable cqlTable)
{
@@ -139,258 +75,21 @@ public class SparkCellIterator implements Iterator<Cell>,
AutoCloseable
: null;
}
- public boolean hasProjectedValueColumns()
- {
- return hasProjectedValueColumns;
- }
-
@Override
- public boolean hasNext()
+ public boolean isInPartition(int partitionId, BigInteger token, ByteBuffer
partitionKey)
{
- try
- {
- return hasNextThrows();
- }
- catch (IOException exception)
- {
- throw new RuntimeException(exception);
- }
- }
-
- public boolean hasNextThrows() throws IOException
- {
- if (next != null || closed)
- {
- return !closed;
- }
- return getNext();
+ return dataLayer.isInPartition(partitionId, token, partitionKey);
}
@Override
- public Cell next()
+ public boolean equals(CqlField field, Object obj1, Object obj2)
{
- Cell result = next;
- assert result != null;
- next = null;
- newRow = false;
- long now = System.nanoTime();
- stats.nextCell(now - previousTimeNanos);
- previousTimeNanos = now;
- return result;
- }
-
- private boolean getNext() throws IOException
- {
- while (scanner.next())
- {
- // If hasNext returns true, it indicates the partition keys has
been loaded into the rid.
- // Therefore, let's try to rebuild partition.
- // Deserialize partition keys - if we have moved to a new
partition - and update 'values' Object[] array.
- maybeRebuildPartition();
-
- scanner.advanceToNextColumn();
-
- // Skip partition e.g. if token is outside of Spark worker token
range
- if (skipPartition)
- {
- continue;
- }
-
- // Deserialize clustering keys - if moved to new CQL row - and
update 'values' Object[] array
- ByteBuffer columnNameBuf =
Objects.requireNonNull(rowData.getColumnName(), "ColumnName buffer in Rid is
null, this is unexpected");
- maybeRebuildClusteringKeys(columnNameBuf);
-
- // Deserialize CQL field column name
- ByteBuffer component =
ByteBufferUtils.extractComponent(columnNameBuf, cqlTable.numClusteringKeys());
- String columnName = component != null ?
FastThreadLocalUtf8Decoder.stringThrowRuntime(component) : null;
- if (columnName == null || columnName.isEmpty())
- {
- if (!hasProjectedValueColumns || !scanner.hasMoreColumns())
- {
- if (hasProjectedValueColumns)
- {
- // null out the value of the cell for the case where
we have projected value columns
- values[values.length - 1] = null;
- }
- // We use the position of a cell for a value column that
is projected, or zero if no value
- // columns are projected. The column we find is irrelevant
because if we fall under this
- // condition it means that we are in a situation where the
row has only PK + CK, but no
- // regular columns.
- next = new Cell(values,
firstProjectedValueColumnPositionOrZero, newRow, rowData.getTimestamp());
- return true;
- }
-
- continue;
- }
-
- CqlField field = cqlTable.getField(columnName);
- if (field == null)
- {
- LOGGER.warn("Ignoring unknown column columnName='{}'",
columnName);
- continue;
- }
-
- // Deserialize value field or static column and update 'values'
Object[] array
- deserializeField(field);
-
- // Static column, so continue reading entire CQL row before
returning
- if (field.isStaticColumn())
- {
- continue;
- }
-
- // Update next Cell
- next = new Cell(values, field.position(), newRow,
rowData.getTimestamp());
-
- return true;
- }
-
- // Finished so close
- next = null;
- close();
- return false;
+ return this.sparkTypes[field.position()].equals(obj1, obj2);
}
@Override
- public void close() throws IOException
- {
- if (!closed)
- {
- scanner.close();
- closed = true;
- long runtimeNanos = System.nanoTime() - startTimeNanos;
- LOGGER.info("Closed CompactionScanner runtimeNanos={}",
runtimeNanos);
- stats.closedSparkCellIterator(runtimeNanos);
- }
- }
-
- /* Iterator Helpers */
-
- /**
- * If it is a new partition see if we can skip (e.g. if partition outside
Spark worker token range), otherwise re-build partition keys
- */
- private void maybeRebuildPartition()
- {
- if (!rowData.isNewPartition())
- {
- return;
- }
-
- // Skip partitions not in the token range for this Spark partition
- newRow = true;
-
- for (CqlField field : cqlTable.staticColumns())
- {
- // We need to reset static columns between partitions, if a static
column is null/not-populated
- // in the next partition, then the previous value might be carried
across
- values[field.position()] = null;
- }
-
- skipPartition = !dataLayer.isInPartition(partitionId,
rowData.getToken(), rowData.getPartitionKey());
- if (skipPartition)
- {
- stats.skippedPartitionInIterator(rowData.getPartitionKey(),
rowData.getToken());
- return;
- }
-
- // Or new partition, so deserialize partition keys and update 'values'
array
- readPartitionKey(sparkSqlTypeConverter, rowData.getPartitionKey(),
cqlTable, this.values, stats);
- }
-
- public static void readPartitionKey(SparkSqlTypeConverter
sparkSqlTypeConverter,
- ByteBuffer partitionKey,
- CqlTable table,
- Object[] values,
- Stats stats)
- {
- if (table.numPartitionKeys() == 1)
- {
- // Not a composite partition key
- CqlField field = table.partitionKeys().get(0);
- values[field.position()] = deserialize(sparkSqlTypeConverter,
field, partitionKey, stats);
- }
- else
- {
- // Split composite partition keys
- ByteBuffer[] partitionKeyBufs =
ByteBufferUtils.split(partitionKey, table.numPartitionKeys());
- int index = 0;
- for (CqlField field : table.partitionKeys())
- {
- values[field.position()] = deserialize(sparkSqlTypeConverter,
field, partitionKeyBufs[index++], stats);
- }
- }
- }
-
- /**
- * Deserialize clustering key components and update 'values' array if
changed. Mark isNewRow true if we move to new CQL row.
- */
- private void maybeRebuildClusteringKeys(@NotNull ByteBuffer columnNameBuf)
+ protected String decodeString(@Nullable ByteBuffer buffer)
{
- List<CqlField> clusteringKeys = cqlTable.clusteringKeys();
- if (clusteringKeys.isEmpty())
- {
- return;
- }
-
- int index = 0;
- for (CqlField field : clusteringKeys)
- {
- Object newObj = deserialize(field,
ByteBufferUtils.extractComponent(columnNameBuf, index++));
- Object oldObj = values[field.position()];
- // Historically, we compare equality of clustering keys using the
Spark types
- // to determine if we have moved to a new 'row'. We could also
compare using the Cassandra types
- // or the raw ByteBuffers before converting to Spark types - this
might be slightly more performant.
- if (newRow || oldObj == null || newObj == null ||
!sparkTypes[field.position()].equals(newObj, oldObj))
- {
- newRow = true;
- values[field.position()] = newObj;
- }
- }
- }
-
- /**
- * Deserialize value field if required and update 'values' array
- */
- private void deserializeField(@NotNull CqlField field)
- {
- if (columnFilter == null || columnFilter.includeColumn(field.name()))
- {
- // Deserialize value
- Object value = deserialize(field, rowData.getValue());
-
- if (field.isStaticColumn())
- {
- values[field.position()] = value;
- return;
- }
-
- values[values.length - 1] = value; // Last index in array always
stores the cell value
- }
- }
-
- private Object deserialize(CqlField field, ByteBuffer buffer)
- {
- return deserialize(sparkSqlTypeConverter, field, buffer, stats);
- }
-
- private static Object deserialize(SparkSqlTypeConverter
sparkSqlTypeConverter, CqlField field, ByteBuffer buffer, Stats stats)
- {
- long now = System.nanoTime();
- Object value = buffer == null ? null :
field.deserializeToType(sparkSqlTypeConverter, buffer);
- stats.fieldDeserialization(field, System.nanoTime() - now);
- return value;
- }
-
- private int maybeGetPositionOfFirstProjectedValueColumnOrZero()
- {
- // find the position of the first value column that is projected
- for (CqlField valueColumn : cqlTable.valueColumns())
- {
- if (columnFilter == null ||
columnFilter.includeColumn(valueColumn.name()))
- {
- return valueColumn.position();
- }
- }
- return 0;
+ return buffer != null ?
FastThreadLocalUtf8Decoder.stringThrowRuntime(buffer) : null;
}
}
diff --git
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
index d77d980..c870dd5 100644
---
a/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
+++
b/cassandra-analytics-core/src/main/spark2/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark.sparksql;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
@@ -28,6 +29,7 @@ import org.apache.cassandra.spark.config.SchemaFeature;
import org.apache.cassandra.spark.data.DataLayer;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
@@ -36,7 +38,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL
rows
*/
-public class SparkRowIterator extends AbstractSparkRowIterator implements
InputPartitionReader<InternalRow>
+public class SparkRowIterator extends AbstractSparkRowIterator<InternalRow>
implements InputPartitionReader<InternalRow>
{
@VisibleForTesting
public SparkRowIterator(int partitionId, @NotNull DataLayer dataLayer)
@@ -49,19 +51,37 @@ public class SparkRowIterator extends
AbstractSparkRowIterator implements InputP
@Nullable StructType requiredSchema,
@NotNull List<PartitionKeyFilter>
partitionKeyFilters)
{
- super(partitionId, dataLayer, requiredSchema, partitionKeyFilters);
+ super(partitionId,
+ dataLayer,
+ requiredSchema,
+ partitionKeyFilters,
+ (builder) -> decorate(builder, dataLayer.requestedFeatures()));
}
@Override
@NotNull
- RowBuilder newBuilder()
+ protected RowBuilder<InternalRow>
newBuilder(Function<RowBuilder<InternalRow>, RowBuilder<InternalRow>> decorator)
{
- RowBuilder builder = new FullRowBuilder(cqlTable,
hasProjectedValueColumns);
- for (SchemaFeature feature : requestedFeatures)
+ RowBuilder<InternalRow> builder = new FullRowBuilder<>(it.cqlTable(),
it.hasProjectedValueColumns(), this::rowBuilder);
+ builder = decorator.apply(builder);
+ builder.reset();
+ return builder;
+ }
+
+ protected static RowBuilder<InternalRow> decorate(RowBuilder<InternalRow>
builder,
+ List<SchemaFeature>
features)
+ {
+ for (SchemaFeature feature : features)
{
builder = feature.decorate(builder);
}
- builder.reset();
+
return builder;
}
+
+ @Override
+ public InternalRow rowBuilder(Object[] result)
+ {
+ return new GenericInternalRow((result));
+ }
}
diff --git
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
index 99a61fe..d78ed5f 100644
---
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
+++
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
@@ -70,7 +70,7 @@ public class PartitionSizeIterator implements
PartitionReader<InternalRow>
IndexEntry entry = it.data();
Object[] values = new Object[numPartitionKeys + 2];
- SparkCellIterator.readPartitionKey(sparkSqlTypeConverter,
entry.getPartitionKey(), cqlTable, values, stats);
+ CellIterator.readPartitionKey(sparkSqlTypeConverter,
entry.getPartitionKey(), cqlTable, values, stats);
values[numPartitionKeys] = entry.getUncompressed();
values[numPartitionKeys + 1] = entry.getCompressed();
diff --git
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
index c4c786b..d076b34 100644
---
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
+++
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/SparkRowIterator.java
@@ -21,21 +21,18 @@ package org.apache.cassandra.spark.sparksql;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.spark.config.SchemaFeature;
-import org.apache.cassandra.spark.data.CqlField;
-import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.data.DataLayer;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
-import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -43,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Wrapper iterator around SparkCellIterator to normalize cells into Spark SQL
rows
*/
-public class SparkRowIterator extends AbstractSparkRowIterator implements
PartitionReader<InternalRow>
+public class SparkRowIterator extends
AbstractSparkRowIterator<GenericInternalRow> implements
PartitionReader<InternalRow>
{
@VisibleForTesting
public SparkRowIterator(int partitionId, @NotNull DataLayer dataLayer)
@@ -51,138 +48,40 @@ public class SparkRowIterator extends
AbstractSparkRowIterator implements Partit
this(partitionId, dataLayer, null, new ArrayList<>());
}
- protected SparkRowIterator(int partitionId,
- @NotNull DataLayer dataLayer,
- @Nullable StructType columnFilter,
- @NotNull List<PartitionKeyFilter>
partitionKeyFilters)
+ public SparkRowIterator(int partitionId,
+ @NotNull DataLayer dataLayer,
+ @Nullable StructType requiredSchema,
+ @NotNull List<PartitionKeyFilter>
partitionKeyFilters)
{
- super(partitionId, dataLayer, columnFilter, partitionKeyFilters);
+ super(
+ partitionId,
+ dataLayer,
+ requiredSchema,
+ partitionKeyFilters,
+ (builder) -> decorate(requiredSchema, builder,
dataLayer.requestedFeatures())
+ );
}
- @Override
- @NotNull
- RowBuilder newBuilder()
+ protected static RowBuilder<GenericInternalRow> decorate(@Nullable
StructType requiredSchema,
+
RowBuilder<GenericInternalRow> builder,
+
List<SchemaFeature> features)
{
- RowBuilder builder;
- String[] fieldNames = null;
- if (columnFilter != null)
- {
- builder = new PartialRowBuilder(columnFilter, cqlTable,
hasProjectedValueColumns);
- fieldNames = columnFilter.fieldNames();
- }
- else
- {
- builder = new FullRowBuilder(cqlTable, hasProjectedValueColumns);
- }
-
- for (SchemaFeature feature : requestedFeatures)
+ Set<String> fieldNames = requiredSchema == null ? null : new
HashSet<>(Arrays.asList(requiredSchema.fieldNames()));
+ for (SchemaFeature feature : features)
{
// Only decorate when there is no column filter or when the field
is requested in the query,
// otherwise we skip decoration
- if (columnFilter == null ||
Arrays.stream(fieldNames).anyMatch(feature.fieldName()::equals))
+ if (fieldNames == null || fieldNames.contains(feature.fieldName()))
{
builder = feature.decorate(builder);
}
}
- builder.reset();
return builder;
}
- /**
- * PartialRowBuilder that builds row only containing fields in
requiredSchema prune-column filter
- * NOTE: Spark 3 changed the contract from Spark 2 and requires us to only
return the columns specified in
- * the requiredSchema 'prune column' filter and not a sparse Object[]
array with null values for excluded columns
- */
- static class PartialRowBuilder extends FullRowBuilder
+ public GenericInternalRow rowBuilder(Object[] valueArray)
{
- private final int[] positionsMap;
- private final boolean hasAllNonValueColumns;
- private final StructType requiredSchema;
-
- PartialRowBuilder(@NotNull StructType requiredSchema,
- CqlTable table,
- boolean hasProjectedValueColumns)
- {
- super(table, hasProjectedValueColumns);
- this.requiredSchema = requiredSchema;
- Set<String> requiredColumns =
Arrays.stream(requiredSchema.fields())
- .map(StructField::name)
- .collect(Collectors.toSet());
- hasAllNonValueColumns = table.fields().stream()
- .filter(CqlField::isNonValueColumn)
- .map(CqlField::name)
- .allMatch(requiredColumns::contains);
-
- // Map original column position to new position in requiredSchema
- positionsMap = IntStream.range(0, table.numFields())
- .map(position -> -1)
- .toArray();
- int position = 0;
- for (StructField structField : requiredSchema.fields())
- {
- CqlField field = table.getField(structField.name());
- if (field != null) // Field might be last modified timestamp
- {
- positionsMap[field.position()] = position++;
- }
- }
- }
-
- @Override
- public void reset()
- {
- count = 0;
- int totalColumns = requiredSchema.size();
- if (totalColumns > 0)
- {
- result = new Object[totalColumns];
- }
- else
- {
- result = EMPTY_RESULT;
- }
- }
-
- @Override
- public int fieldIndex(String name)
- {
- return requiredSchema != null ? requiredSchema.fieldIndex(name) :
super.fieldIndex(name);
- }
-
- @Override
- public void copyKeys(Cell cell)
- {
- if (hasAllNonValueColumns)
- {
- // Optimization if we are returning all primary key/static
columns we can use the super method
- super.copyKeys(cell);
- return;
- }
-
- // Otherwise we need to only return columns requested and map to
new position in result array
- int length = !hasProjectedValueColumns ? cell.values.length :
cell.values.length - 1;
- for (int index = 0; index < length; index++)
- {
- int position = positionsMap[index];
- if (position >= 0)
- {
- result[position] = cell.values[index];
- }
- }
- count += length;
- }
-
- @Override
- public void copyValue(Cell cell)
- {
- // Copy the next value column mapping column to new position
- int position = positionsMap[cell.position];
- if (position >= 0)
- {
- result[position] = cell.values[cell.values.length - 1];
- }
- count++;
- }
+ return new GenericInternalRow(valueArray);
}
}
diff --git
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeature.java
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeature.java
index 30b03f7..968b03d 100644
---
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeature.java
+++
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeature.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.cassandra.spark.data.CqlTable;
import org.apache.cassandra.spark.sparksql.RowBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
@@ -58,7 +59,7 @@ public interface SchemaFeature
* @param builder the row builder
* @return a new decorated builder
*/
- RowBuilder decorate(RowBuilder builder);
+ <T extends InternalRow> RowBuilder<T> decorate(RowBuilder<T> builder);
/**
* The option name used in the Spark options
diff --git
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeatureSet.java
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeatureSet.java
index 2f08486..671fbdc 100644
---
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeatureSet.java
+++
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/config/SchemaFeatureSet.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
import org.apache.cassandra.spark.sparksql.LastModifiedTimestampDecorator;
import org.apache.cassandra.spark.sparksql.RowBuilder;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
@@ -43,9 +44,9 @@ public enum SchemaFeatureSet implements SchemaFeature
}
@Override
- public RowBuilder decorate(RowBuilder builder)
+ public <T extends InternalRow> RowBuilder<T>
decorate(RowBuilder<T> builder)
{
- return new LastModifiedTimestampDecorator(builder,
fieldName());
+ return new LastModifiedTimestampDecorator<>(builder,
fieldName());
}
};
diff --git
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/LastModifiedTimestampDecorator.java
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/LastModifiedTimestampDecorator.java
index 5414838..fa1a279 100644
---
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/LastModifiedTimestampDecorator.java
+++
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/LastModifiedTimestampDecorator.java
@@ -19,17 +19,18 @@
package org.apache.cassandra.spark.sparksql;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.InternalRow;
/**
* Wrap a builder to append last modified timestamp
+ * @param <T> type of row returned by this builder
*/
-public class LastModifiedTimestampDecorator extends RowBuilderDecorator
+public class LastModifiedTimestampDecorator<T extends InternalRow> extends
RowBuilderDecorator<T>
{
private final int lmtColumnPosition;
private long lastModified = 0L;
- public LastModifiedTimestampDecorator(RowBuilder delegate, String
fieldName)
+ public LastModifiedTimestampDecorator(RowBuilder<T> delegate, String
fieldName)
{
super(delegate);
int width = internalExpandRow();
@@ -60,7 +61,7 @@ public class LastModifiedTimestampDecorator extends
RowBuilderDecorator
}
@Override
- public GenericInternalRow build()
+ public T build()
{
// Append last modified timestamp
Object[] result = array();
diff --git
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilderDecorator.java
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilderDecorator.java
index 7dfbc59..dfbc9af 100644
---
a/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilderDecorator.java
+++
b/cassandra-analytics-spark-converter/src/main/java/org/apache/cassandra/spark/sparksql/RowBuilderDecorator.java
@@ -20,13 +20,13 @@
package org.apache.cassandra.spark.sparksql;
import org.apache.cassandra.spark.data.CqlTable;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.InternalRow;
-abstract class RowBuilderDecorator implements RowBuilder
+abstract class RowBuilderDecorator<T extends InternalRow> implements
RowBuilder<T>
{
- protected final RowBuilder delegate;
+ protected final RowBuilder<T> delegate;
- RowBuilderDecorator(RowBuilder delegate)
+ RowBuilderDecorator(RowBuilder<T> delegate)
{
this.delegate = delegate;
}
@@ -116,7 +116,7 @@ abstract class RowBuilderDecorator implements RowBuilder
}
@Override
- public GenericInternalRow build()
+ public T build()
{
return delegate.build();
}
diff --git
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
index 8b3883b..db63df8 100644
---
a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
+++
b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
@@ -247,7 +247,7 @@ public abstract class AbstractStreamScanner implements
StreamScanner<RowData>, C
return true;
}
- // For non-compact tables, set up a
ClusteringColumnDataState to emit a Rid that emulates a
+ // For non-compact tables, set up a
ClusteringColumnDataState to emit a RowData that emulates a
// pre-3.0 CQL row marker. This is necessary for backwards
compatibility with 2.1 & 2.0 output,
// and also for tables with only primary key columns
defined.
// An empty PKLI is the 3.0 equivalent of having no row
marker (e.g. row modifications via
@@ -319,7 +319,7 @@ public abstract class AbstractStreamScanner implements
StreamScanner<RowData>, C
/**
* Maps clustering values to column data, to emulate CQL row markers which
were removed in Cassandra 3.0,
- * but which we must still emit Rid for in order to preserve backwards
compatibility
+ * but which we must still emit RowData for in order to preserve backwards
compatibility
* and to handle tables containing only primary key columns
*/
protected final class ClusteringColumnDataState implements ColumnDataState
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]