This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 014db08  CASSANDRA-18683: Add PartitionSizeTableProvider for reading 
the compressed and uncompressed sizes of all partitions in a table by utilizing 
the SSTable Index.db files
014db08 is described below

commit 014db08a79f00ef0d94e6855779e398c9dc689c1
Author: James Berragan <jberra...@apple.com>
AuthorDate: Wed Jul 19 12:23:07 2023 -0700

    CASSANDRA-18683: Add PartitionSizeTableProvider for reading the compressed 
and uncompressed sizes of all partitions in a table by utilizing the SSTable 
Index.db files
    
    Patch by James Berragan; Reviewed by Dinesh Joshi, Yifan Cai for 
CASSANDRA-18683
---
 CHANGES.txt                                        |   1 +
 cassandra-analytics-core/build.gradle              |  11 +
 .../cassandra/spark/data/FileSystemSSTable.java    |  15 +-
 .../spark/data/SidecarProvisionedSSTable.java      |  10 +
 .../spark/sparksql/SparkCellIterator.java          |  48 +--
 .../spark/sparksql/LocalPartitionSizeSource.java   |  28 +-
 .../spark/sparksql/PartitionSizeIterator.java      |  93 ++++++
 .../spark/sparksql/PartitionSizeTableProvider.java | 178 +++++++++++
 .../org/apache/cassandra/spark/TestDataLayer.java  |  11 +-
 .../java/org/apache/cassandra/spark/TestUtils.java |  48 +++
 .../apache/cassandra/spark/PartitionSizeTests.java |  95 ++++++
 .../apache/cassandra/bridge/CassandraBridge.java   |  56 ++--
 .../apache/cassandra/spark/data/BasicSupplier.java |   0
 .../org/apache/cassandra/spark/data/DataLayer.java |  64 +++-
 .../org/apache/cassandra/spark/data/SSTable.java   |   2 +
 .../cassandra/spark/reader/EmptyStreamScanner.java |   2 +-
 ...{EmptyStreamScanner.java => IndexConsumer.java} |  28 +-
 .../apache/cassandra/spark/reader/IndexEntry.java  |  71 +++++
 .../cassandra/spark/reader/StreamScanner.java      |  29 +-
 .../reader/common/AbstractCompressionMetadata.java |  15 +-
 .../IIndexReader.java}                             |  29 +-
 .../spark/reader/common/IndexIterator.java         | 184 ++++++++++++
 .../org/apache/cassandra/spark/stats/Stats.java    | 137 +++++++++
 .../cassandra/spark/utils/test/TestSSTable.java    |   6 +
 .../cassandra/spark/utils/test/TestSchema.java     |   5 +-
 cassandra-four-zero/build.gradle                   |   5 +
 .../bridge/CassandraBridgeImplementation.java      |  64 ++--
 .../spark/reader/AbstractStreamScanner.java        |   2 +-
 .../cassandra/spark/reader/CdcScannerBuilder.java  |   2 +-
 .../spark/reader/CompressionMetadata.java          |   4 +-
 .../apache/cassandra/spark/reader/IndexReader.java | 324 +++++++++++++++++++++
 .../apache/cassandra/spark/reader/ReaderUtils.java |   9 +-
 .../cassandra/spark/reader/IndexReaderTests.java   | 273 +++++++++++++++++
 33 files changed, 1677 insertions(+), 172 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 41e2b04..37b126d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Expose per partition on-disk usage through new DataFrame that utilizes the 
Index.db SSTable file components (CASSANDRA-18683)
  * Fix bulk writes with Buffered RowBufferMode (CASSANDRA-18692)
  * Minor Refactoring to Improve Code Reusability (CASSANDRA-18684)
  * Fix cassandra-analytics-core-example (CASSANDRA-18662)
diff --git a/cassandra-analytics-core/build.gradle 
b/cassandra-analytics-core/build.gradle
index 572af8d..e1bd028 100644
--- a/cassandra-analytics-core/build.gradle
+++ b/cassandra-analytics-core/build.gradle
@@ -170,6 +170,17 @@ project(':cassandra-analytics-core') {
         finalizedBy(tasks.jacocoTestReport)
     }
     /* End: JaCoCo check */
+
+    configurations {
+        testArtifacts
+    }
+    task testJar(type: Jar) {
+        baseName = "${project.name}-test"
+        from sourceSets.test.output
+    }
+    artifacts {
+        testArtifacts testJar
+    }
 }
 
 private void writeBuildVersion(version, projectDir) {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
index 512f4c9..c8c4296 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
@@ -31,8 +31,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.spark.stats.Stats;
+import org.apache.cassandra.spark.utils.IOUtils;
 import org.apache.cassandra.spark.utils.ThrowableUtils;
 import org.apache.cassandra.spark.utils.streaming.SSTableInputStream;
+import org.jetbrains.annotations.Nullable;
 
 class FileSystemSSTable extends SSTable
 {
@@ -75,10 +77,21 @@ class FileSystemSSTable extends SSTable
         }
     }
 
+    public long length(FileType fileType)
+    {
+        return IOUtils.size(resolveComponentFile(fileType));
+    }
+
     @Override
     public boolean isMissing(FileType fileType)
     {
-        return FileType.resolveComponentFile(fileType, dataFilePath) == null;
+        return resolveComponentFile(fileType) == null;
+    }
+
+    @Nullable
+    private Path resolveComponentFile(FileType fileType)
+    {
+        return FileType.resolveComponentFile(fileType, dataFilePath);
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index e985512..43be542 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -127,6 +127,16 @@ public class SidecarProvisionedSSTable extends SSTable
         return openStream(snapshotFile.fileName, snapshotFile.size, fileType);
     }
 
+    public long length(FileType fileType)
+    {
+        ListSnapshotFilesResponse.FileInfo snapshotFile = 
components.get(fileType);
+        if (snapshotFile == null)
+        {
+            throw new IncompleteSSTableException(fileType);
+        }
+        return snapshotFile.size;
+    }
+
     @Override
     public boolean isMissing(FileType fileType)
     {
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
index f40a88f..f5519bc 100644
--- 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
+++ 
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java
@@ -59,13 +59,12 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
     private final Stats stats;
     private final CqlTable cqlTable;
     private final Object[] values;
-    private final int numPartitionKeys;
     private final boolean noValueColumns;
     @Nullable
     protected final PruneColumnFilter columnFilter;
     private final long startTimeNanos;
     @NotNull
-    private final StreamScanner scanner;
+    private final StreamScanner<Rid> scanner;
     @NotNull
     private final Rid rid;
 
@@ -88,7 +87,6 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         this.dataLayer = dataLayer;
         stats = dataLayer.stats();
         cqlTable = dataLayer.cqlTable();
-        numPartitionKeys = cqlTable.numPartitionKeys();
         columnFilter = buildColumnFilter(requiredSchema, cqlTable);
         if (columnFilter != null)
         {
@@ -118,9 +116,9 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         stats.openedSparkCellIterator();
     }
 
-    protected StreamScanner openScanner(int partitionId,
-                                        @NotNull List<PartitionKeyFilter> 
partitionKeyFilters,
-                                        @Nullable CdcOffsetFilter 
cdcOffsetFilter)
+    protected StreamScanner<Rid> openScanner(int partitionId,
+                                             @NotNull List<PartitionKeyFilter> 
partitionKeyFilters,
+                                             @Nullable CdcOffsetFilter 
cdcOffsetFilter)
     {
         return dataLayer.openCompactionScanner(partitionId, 
partitionKeyFilters, columnFilter);
     }
@@ -129,11 +127,11 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
     static PruneColumnFilter buildColumnFilter(@Nullable StructType 
requiredSchema, @NotNull CqlTable cqlTable)
     {
         return requiredSchema != null
-                ? new PruneColumnFilter(Arrays.stream(requiredSchema.fields())
-                                              .map(StructField::name)
-                                              .filter(cqlTable::has)
-                                              .collect(Collectors.toSet()))
-                : null;
+               ? new PruneColumnFilter(Arrays.stream(requiredSchema.fields())
+                                             .map(StructField::name)
+                                             .filter(cqlTable::has)
+                                             .collect(Collectors.toSet()))
+               : null;
     }
 
     public boolean noValueColumns()
@@ -339,21 +337,28 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
         }
 
         // Or new partition, so deserialize partition keys and update 'values' 
array
-        ByteBuffer partitionKey = rid.getPartitionKey();
-        if (numPartitionKeys == 1)
+        readPartitionKey(rid.getPartitionKey(), cqlTable, this.values, stats);
+    }
+
+    public static void readPartitionKey(ByteBuffer partitionKey,
+                                        CqlTable table,
+                                        Object[] values,
+                                        Stats stats)
+    {
+        if (table.numPartitionKeys() == 1)
         {
             // Not a composite partition key
-            CqlField field = cqlTable.partitionKeys().get(0);
-            values[field.position()] = deserialize(field, partitionKey);
+            CqlField field = table.partitionKeys().get(0);
+            values[field.position()] = deserialize(field, partitionKey, stats);
         }
         else
         {
             // Split composite partition keys
-            ByteBuffer[] partitionKeyBufs = ColumnTypes.split(partitionKey, 
numPartitionKeys);
+            ByteBuffer[] partitionKeyBufs = ColumnTypes.split(partitionKey, 
table.numPartitionKeys());
             int index = 0;
-            for (CqlField field : cqlTable.partitionKeys())
+            for (CqlField field : table.partitionKeys())
             {
-                values[field.position()] = deserialize(field, 
partitionKeyBufs[index++]);
+                values[field.position()] = deserialize(field, 
partitionKeyBufs[index++], stats);
             }
         }
     }
@@ -403,9 +408,14 @@ public class SparkCellIterator implements Iterator<Cell>, 
AutoCloseable
     }
 
     private Object deserialize(CqlField field, ByteBuffer buffer)
+    {
+        return deserialize(field, buffer, stats);
+    }
+
+    private static Object deserialize(CqlField field, ByteBuffer buffer, Stats 
stats)
     {
         long now = System.nanoTime();
-        Object value = buffer != null ? field.deserialize(buffer) : null;
+        Object value = buffer == null ? null : field.deserialize(buffer);
         stats.fieldDeserialization(field, System.nanoTime() - now);
         return value;
     }
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/LocalPartitionSizeSource.java
similarity index 64%
copy from 
cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
copy to 
cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/LocalPartitionSizeSource.java
index b4c8e0e..cf24f1c 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
+++ 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/LocalPartitionSizeSource.java
@@ -17,31 +17,23 @@
  * under the License.
  */
 
-package org.apache.cassandra.spark.reader;
+package org.apache.cassandra.spark.sparksql;
 
-public class EmptyStreamScanner implements StreamScanner
-{
-    public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner();
-
-    @Override
-    public Rid rid()
-    {
-        return null;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        return false;
-    }
+import org.apache.cassandra.spark.data.DataLayer;
+import org.apache.cassandra.spark.data.LocalDataLayer;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
+public class LocalPartitionSizeSource extends PartitionSizeTableProvider
+{
     @Override
-    public void advanceToNextColumn()
+    public String shortName()
     {
+        return "localpartitionsizesource";
     }
 
     @Override
-    public void close()
+    public DataLayer getDataLayer(CaseInsensitiveStringMap options)
     {
+        return LocalDataLayer.from(options);
     }
 }
diff --git 
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
new file mode 100644
index 0000000..d1efa94
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeIterator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql;
+
+import java.io.IOException;
+
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.DataLayer;
+import org.apache.cassandra.spark.reader.IndexEntry;
+import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.stats.Stats;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Wrapper iterator around IndexIterator to read all Index.db files and return 
SparkSQL
+ * rows containing all partition keys and the associated on-disk uncompressed 
and compressed sizes.
+ */
+public class PartitionSizeIterator implements PartitionReader<InternalRow>
+{
+    private final StreamScanner<IndexEntry> it;
+    private final CqlTable cqlTable;
+    private final int numPartitionKeys;
+    private final Stats stats;
+    private final long startTimeNanos;
+    private GenericInternalRow curr = null;
+
+    public PartitionSizeIterator(int partitionId, @NotNull DataLayer dataLayer)
+    {
+        this.cqlTable = dataLayer.cqlTable();
+        this.numPartitionKeys = cqlTable.numPartitionKeys();
+        this.stats = dataLayer.stats();
+        this.startTimeNanos = System.nanoTime();
+        this.it = dataLayer.openPartitionSizeIterator(partitionId);
+        stats.openedPartitionSizeIterator(System.nanoTime() - startTimeNanos);
+    }
+
+    /**
+     * The expected schema is defined in {@link 
DataLayer#partitionSizeStructType()}.
+     * It consists of the Cassandra partition keys, appended with the columns 
"uncompressed" and "compressed".
+     */
+    public boolean next() throws IOException
+    {
+        if (it.hasNext())
+        {
+            it.advanceToNextColumn();
+
+            IndexEntry entry = it.rid();
+            Object[] values = new Object[numPartitionKeys + 2];
+
+            SparkCellIterator.readPartitionKey(entry.getPartitionKey(), 
cqlTable, values, stats);
+            values[numPartitionKeys] = entry.getUncompressed();
+            values[numPartitionKeys + 1] = entry.getCompressed();
+
+            this.curr = new GenericInternalRow(values);
+            stats.emitIndexEntry(entry);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    public InternalRow get()
+    {
+        return curr;
+    }
+
+    public void close() throws IOException
+    {
+        this.it.close();
+        stats.closedPartitionSizeIterator(System.nanoTime() - startTimeNanos);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeTableProvider.java
 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeTableProvider.java
new file mode 100644
index 0000000..1f2eeb0
--- /dev/null
+++ 
b/cassandra-analytics-core/src/main/spark3/org/apache/cassandra/spark/sparksql/PartitionSizeTableProvider.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.sparksql;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.spark.data.DataLayer;
+import org.apache.spark.TaskContext;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.SupportsRead;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.connector.read.Batch;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public abstract class PartitionSizeTableProvider extends CassandraTableProvider
+{
+    public abstract DataLayer getDataLayer(CaseInsensitiveStringMap options);
+
+    @Override
+    public StructType inferSchema(CaseInsensitiveStringMap options)
+    {
+        return getDataLayerInternal(options).partitionSizeStructType();
+    }
+
+    @Override
+    public Table getTable(StructType schema, Transform[] partitioning, 
Map<String, String> properties)
+    {
+        return new PartitionSizeTable(getDataLayerInternal(new 
CaseInsensitiveStringMap(properties)), schema);
+    }
+}
+
+class PartitionSizeTable implements Table, SupportsRead
+{
+    private final DataLayer dataLayer;
+    private final StructType schema;
+
+    PartitionSizeTable(DataLayer dataLayer, StructType schema)
+    {
+        this.dataLayer = dataLayer;
+        this.schema = schema;
+    }
+
+    @Override
+    public String name()
+    {
+        return dataLayer.cqlTable().keyspace() + "." + 
dataLayer.cqlTable().table();
+    }
+
+    @Override
+    public StructType schema()
+    {
+        return schema;
+    }
+
+    @Override
+    public Set<TableCapability> capabilities()
+    {
+        return ImmutableSet.of(TableCapability.BATCH_READ);
+    }
+
+    @Override
+    public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options)
+    {
+        return new PartitionSizeScanBuilder(dataLayer, schema, options);
+    }
+}
+
+class PartitionSizeScanBuilder implements ScanBuilder, Scan, Batch
+{
+    final DataLayer dataLayer;
+    final StructType schema;
+    final CaseInsensitiveStringMap options;
+
+    PartitionSizeScanBuilder(DataLayer dataLayer, StructType schema, 
CaseInsensitiveStringMap options)
+    {
+        this.dataLayer = dataLayer;
+        this.schema = schema;
+        this.options = options;
+    }
+
+    @Override
+    public Scan build()
+    {
+        return this;
+    }
+
+    @Override
+    public StructType readSchema()
+    {
+        return schema;
+    }
+
+    @Override
+    public Batch toBatch()
+    {
+        return this;
+    }
+
+    @Override
+    public InputPartition[] planInputPartitions()
+    {
+        return IntStream.range(0, this.dataLayer.partitionCount())
+                        .mapToObj(CassandraInputPartition::new)
+                        .toArray(InputPartition[]::new);
+    }
+
+    @Override
+    public MicroBatchStream toMicroBatchStream(String checkpointLocation)
+    {
+        return new CassandraMicroBatchStream(dataLayer, null, options);
+    }
+
+    @Override
+    public PartitionReaderFactory createReaderFactory()
+    {
+        return new PartitionSizeReaderFactory(dataLayer);
+    }
+}
+
+class PartitionSizeReaderFactory implements PartitionReaderFactory
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraPartitionReaderFactory.class);
+    final DataLayer dataLayer;
+
+    PartitionSizeReaderFactory(DataLayer dataLayer)
+    {
+        this.dataLayer = dataLayer;
+    }
+
+    @Override
+    public PartitionReader<InternalRow> createReader(InputPartition partition)
+    {
+        int partitionId;
+        if (partition instanceof CassandraInputPartition)
+        {
+            partitionId = ((CassandraInputPartition) 
partition).getPartitionId();
+        }
+        else
+        {
+            partitionId = TaskContext.getPartitionId();
+            LOGGER.warn("InputPartition is not of CassandraInputPartition 
type. Using TaskContext to determine the partitionId type={}, partitionId={}",
+                        partition.getClass().getName(), partitionId);
+        }
+        return new PartitionSizeIterator(partitionId, dataLayer);
+    }
+}
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java
index 96267e6..6583c98 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestDataLayer.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -42,6 +43,7 @@ import org.apache.cassandra.spark.cdc.TableIdLookup;
 import org.apache.cassandra.spark.data.BasicSupplier;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.DataLayer;
+import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.data.SSTablesSupplier;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
@@ -123,9 +125,12 @@ public class TestDataLayer extends DataLayer
                                      @Nullable SparkRangeFilter 
sparkRangeFilter,
                                      @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
     {
-        return new BasicSupplier(dataDbFiles.stream()
-                                            .map(TestSSTable::at)
-                                            .collect(Collectors.toSet()));
+        return new BasicSupplier(listSSTables().collect(Collectors.toSet()));
+    }
+
+    public Stream<SSTable> listSSTables()
+    {
+        return dataDbFiles.stream().map(TestSSTable::at);
     }
 
     @Override
diff --git 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
index 4b1f726..0144648 100644
--- 
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
+++ 
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/TestUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.spark;
 
 import java.io.IOException;
 import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -38,6 +39,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.bridge.CassandraBridge;
 import org.apache.cassandra.bridge.CassandraBridgeFactory;
@@ -50,6 +52,7 @@ import 
org.apache.cassandra.spark.data.partitioner.CassandraInstance;
 import org.apache.cassandra.spark.data.partitioner.CassandraRing;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.utils.FilterUtils;
+import org.apache.cassandra.spark.utils.RandomUtils;
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -190,6 +193,29 @@ public final class TestUtils
         }
     }
 
+    static Dataset<Row> openLocalPartitionSizeSource(Partitioner partitioner,
+                                                     Path dir,
+                                                     String keyspace,
+                                                     String createStmt,
+                                                     CassandraVersion version,
+                                                     Set<CqlField.CqlUdt> udts,
+                                                     @Nullable String 
statsClass)
+    {
+        DataFrameReader frameReader = 
SPARK.read().format("org.apache.cassandra.spark.sparksql.LocalPartitionSizeSource")
+                                           .option("keyspace", keyspace)
+                                           .option("createStmt", createStmt)
+                                           .option("dirs", 
dir.toAbsolutePath().toString())
+                                           .option("version", 
version.toString())
+                                           .option("useSSTableInputStream", 
true) // use in the test system to test the SSTableInputStream
+                                           .option("partitioner", 
partitioner.name())
+                                           .option("udts", udts.stream().map(f 
-> f.createStatement(keyspace)).collect(Collectors.joining("\n")));
+        if (statsClass != null)
+        {
+            frameReader = frameReader.option("statsClass", statsClass);
+        }
+        return frameReader.load();
+    }
+
     public static Dataset<Row> read(Path path, StructType schema)
     {
         return SPARK.read()
@@ -352,4 +378,26 @@ public final class TestUtils
         });
         return filterKeys;
     }
+
+    public static String randomLowEntropyString()
+    {
+        return new String(randomLowEntropyData(), StandardCharsets.UTF_8);
+    }
+
+    public static byte[] randomLowEntropyData()
+    {
+        return randomLowEntropyData(RandomUtils.randomPositiveInt(16384 - 512) 
+ 512);
+    }
+
+    public static byte[] randomLowEntropyData(int size)
+    {
+        return randomLowEntropyData("Hello world!", size);
+    }
+
+    public static byte[] randomLowEntropyData(String str, int size)
+    {
+        return StringUtils.repeat(str, size / str.length() + 1)
+                          .substring(0, size)
+                          .getBytes(StandardCharsets.UTF_8);
+    }
 }
diff --git 
a/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/PartitionSizeTests.java
 
b/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/PartitionSizeTests.java
new file mode 100644
index 0000000..c08b2b9
--- /dev/null
+++ 
b/cassandra-analytics-core/src/test/spark3/org/apache/cassandra/spark/PartitionSizeTests.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.cassandra.bridge.CassandraVersion;
+import org.apache.cassandra.spark.data.VersionRunner;
+import org.apache.cassandra.spark.utils.test.TestSchema;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PartitionSizeTests extends VersionRunner
+{
+    @ParameterizedTest
+    @MethodSource("org.apache.cassandra.spark.data.VersionRunner#versions")
+    public void testReadingPartitionSize(CassandraVersion version)
+    {
+        TestUtils.runTest(version, (partitioner, dir, bridge) -> {
+            int numRows = Tester.DEFAULT_NUM_ROWS;
+            int numCols = 25;
+            TestSchema schema = TestSchema.builder()
+                                                .withPartitionKey("a", 
bridge.text())
+                                                .withClusteringKey("b", 
bridge.aInt())
+                                                .withColumn("c", bridge.aInt())
+                                                .withColumn("d", 
bridge.text()).build();
+
+            Map<String, Integer> sizes = new HashMap<>(numRows);
+            schema.writeSSTable(dir, bridge, partitioner, (writer) -> {
+                for (int i = 0; i < numRows; i++)
+                {
+                    String key = UUID.randomUUID().toString();
+                    int size = 0;
+                    for (int j = 0; j < numCols; j++)
+                    {
+                        String str = TestUtils.randomLowEntropyString();
+                        writer.write(key, j, i + j, str);
+                        size += 4 + 4 + 
str.getBytes(StandardCharsets.UTF_8).length;
+                    }
+                    sizes.put(key, size);
+                }
+            });
+
+            Dataset<Row> ds = 
TestUtils.openLocalPartitionSizeSource(partitioner,
+                                                                           dir,
+                                                                           
schema.keyspace,
+                                                                           
schema.createStatement,
+                                                                           
version,
+                                                                           
Collections.emptySet(),
+                                                                           
null);
+            List<Row> rows = ds.collectAsList();
+            assertEquals(numRows, rows.size());
+            for (Row row : rows)
+            {
+                String key = row.getString(0);
+                long uncompressed = row.getLong(1);
+                long compressed = row.getLong(2);
+                assertTrue(sizes.containsKey(key));
+                long len = sizes.get(key);
+                assertTrue(len < uncompressed);
+                assertTrue(Math.abs(uncompressed - len) < 500); // 
uncompressed size should be ~len size but with a fixed overhead
+                assertTrue(compressed < uncompressed);
+                assertTrue(compressed / (float) uncompressed < 0.1);
+            }
+        });
+    }
+}
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
index 907c656..0239bdf 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java
@@ -63,6 +63,8 @@ import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.SSTablesSupplier;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.reader.IndexEntry;
+import org.apache.cassandra.spark.reader.Rid;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
@@ -94,32 +96,40 @@ public abstract class CassandraBridge
 
     // CDC Stream Scanner
     // CHECKSTYLE IGNORE: Method with many parameters
-    public abstract StreamScanner getCdcScanner(int partitionId,
-                                                @NotNull CqlTable table,
-                                                @NotNull Partitioner 
partitioner,
-                                                @NotNull CommitLogProvider 
commitLogProvider,
-                                                @NotNull TableIdLookup 
tableIdLookup,
-                                                @NotNull Stats stats,
-                                                @Nullable SparkRangeFilter 
sparkRangeFilter,
-                                                @Nullable CdcOffsetFilter 
offset,
-                                                int minimumReplicasPerMutation,
-                                                @NotNull Watermarker 
watermarker,
-                                                @NotNull String jobId,
-                                                @NotNull ExecutorService 
executorService,
-                                                @NotNull TimeProvider 
timeProvider);
+    public abstract StreamScanner<Rid> getCdcScanner(int partitionId,
+                                                     @NotNull CqlTable table,
+                                                     @NotNull Partitioner 
partitioner,
+                                                     @NotNull 
CommitLogProvider commitLogProvider,
+                                                     @NotNull TableIdLookup 
tableIdLookup,
+                                                     @NotNull Stats stats,
+                                                     @Nullable 
SparkRangeFilter sparkRangeFilter,
+                                                     @Nullable CdcOffsetFilter 
offset,
+                                                     int 
minimumReplicasPerMutation,
+                                                     @NotNull Watermarker 
watermarker,
+                                                     @NotNull String jobId,
+                                                     @NotNull ExecutorService 
executorService,
+                                                     @NotNull TimeProvider 
timeProvider);
 
     // Compaction Stream Scanner
     // CHECKSTYLE IGNORE: Method with many parameters
-    public abstract StreamScanner getCompactionScanner(@NotNull CqlTable table,
-                                                       @NotNull Partitioner 
partitionerType,
-                                                       @NotNull 
SSTablesSupplier ssTables,
-                                                       @Nullable 
SparkRangeFilter sparkRangeFilter,
-                                                       @NotNull 
Collection<PartitionKeyFilter> partitionKeyFilters,
-                                                       @Nullable 
PruneColumnFilter columnFilter,
-                                                       @NotNull TimeProvider 
timeProvider,
-                                                       boolean readIndexOffset,
-                                                       boolean 
useIncrementalRepair,
-                                                       @NotNull Stats stats);
+    public abstract StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable 
table,
+                                                            @NotNull 
Partitioner partitionerType,
+                                                            @NotNull 
SSTablesSupplier ssTables,
+                                                            @Nullable 
SparkRangeFilter sparkRangeFilter,
+                                                            @NotNull 
Collection<PartitionKeyFilter> partitionKeyFilters,
+                                                            @Nullable 
PruneColumnFilter columnFilter,
+                                                            @NotNull 
TimeProvider timeProvider,
+                                                            boolean 
readIndexOffset,
+                                                            boolean 
useIncrementalRepair,
+                                                            @NotNull Stats 
stats);
+
+    public abstract StreamScanner<IndexEntry> 
getPartitionSizeIterator(@NotNull CqlTable table,
+                                                                       
@NotNull Partitioner partitioner,
+                                                                       
@NotNull SSTablesSupplier ssTables,
+                                                                       
@Nullable SparkRangeFilter rangeFilter,
+                                                                       
@NotNull TimeProvider timeProvider,
+                                                                       
@NotNull Stats stats,
+                                                                       
@NotNull ExecutorService executor);
 
     public abstract CassandraVersion getVersion();
 
diff --git 
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/BasicSupplier.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/BasicSupplier.java
similarity index 100%
rename from 
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/BasicSupplier.java
rename to 
cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/BasicSupplier.java
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
index b2cd8c0..b7ba896 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/DataLayer.java
@@ -42,6 +42,8 @@ import org.apache.cassandra.spark.cdc.watermarker.Watermarker;
 import org.apache.cassandra.spark.config.SchemaFeature;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
 import org.apache.cassandra.spark.reader.EmptyStreamScanner;
+import org.apache.cassandra.spark.reader.IndexEntry;
+import org.apache.cassandra.spark.reader.Rid;
 import org.apache.cassandra.spark.reader.StreamScanner;
 import org.apache.cassandra.spark.sparksql.NoMatchFoundException;
 import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter;
@@ -53,6 +55,7 @@ import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.spark.sql.sources.EqualTo;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.In;
+import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
@@ -67,6 +70,27 @@ public abstract class DataLayer implements Serializable
     {
     }
 
+    /**
+     * @return SparkSQL table schema expected for reading Partition sizes with 
PartitionSizeTableProvider.
+     */
+    public StructType partitionSizeStructType()
+    {
+        StructType structType = new StructType();
+        for (CqlField field : cqlTable().partitionKeys())
+        {
+            MetadataBuilder metadata = fieldMetaData(field);
+            structType = structType.add(field.name(),
+                                        
field.type().sparkSqlType(bigNumberConfig(field)),
+                                        true,
+                                        metadata.build());
+        }
+
+        structType = structType.add("uncompressed", DataTypes.LongType);
+        structType = structType.add("compressed", DataTypes.LongType);
+
+        return structType;
+    }
+
     /**
      * Map Cassandra CQL table schema to SparkSQL StructType
      *
@@ -78,15 +102,7 @@ public abstract class DataLayer implements Serializable
         for (CqlField field : cqlTable().fields())
         {
             // Pass Cassandra field metadata in StructField metadata
-            MetadataBuilder metadata = new MetadataBuilder();
-            metadata.putLong("position", field.position());
-            metadata.putString("cqlType", field.cqlTypeName());
-            metadata.putBoolean("isPartitionKey", field.isPartitionKey());
-            metadata.putBoolean("isPrimaryKey", field.isPrimaryKey());
-            metadata.putBoolean("isClusteringKey", field.isClusteringColumn());
-            metadata.putBoolean("isStaticColumn", field.isStaticColumn());
-            metadata.putBoolean("isValueColumn", field.isValueColumn());
-
+            MetadataBuilder metadata = fieldMetaData(field);
             structType = structType.add(field.name(),
                                         
field.type().sparkSqlType(bigNumberConfig(field)),
                                         true,
@@ -103,6 +119,19 @@ public abstract class DataLayer implements Serializable
         return structType;
     }
 
+    private MetadataBuilder fieldMetaData(CqlField field)
+    {
+        MetadataBuilder metadata = new MetadataBuilder();
+        metadata.putLong("position", field.position());
+        metadata.putString("cqlType", field.cqlTypeName());
+        metadata.putBoolean("isPartitionKey", field.isPartitionKey());
+        metadata.putBoolean("isPrimaryKey", field.isPrimaryKey());
+        metadata.putBoolean("isClusteringKey", field.isClusteringColumn());
+        metadata.putBoolean("isStaticColumn", field.isStaticColumn());
+        metadata.putBoolean("isValueColumn", field.isValueColumn());
+        return metadata;
+    }
+
     public List<SchemaFeature> requestedFeatures()
     {
         return Collections.emptyList();
@@ -271,9 +300,9 @@ public abstract class DataLayer implements Serializable
     /**
      * @return CompactionScanner for iterating over one or more SSTables, 
compacting data and purging tombstones
      */
-    public StreamScanner openCompactionScanner(int partitionId,
-                                               List<PartitionKeyFilter> 
partitionKeyFilters,
-                                               @Nullable PruneColumnFilter 
columnFilter)
+    public StreamScanner<Rid> openCompactionScanner(int partitionId,
+                                                    List<PartitionKeyFilter> 
partitionKeyFilters,
+                                                    @Nullable 
PruneColumnFilter columnFilter)
     {
         List<PartitionKeyFilter> filtersInRange;
         try
@@ -297,6 +326,17 @@ public abstract class DataLayer implements Serializable
                                              stats());
     }
 
+    /**
+     * @param partitionId Spark partition id
+     * @return a PartitionSizeIterator that iterates over Index.db files to 
calculate partition size.
+     */
+    public StreamScanner<IndexEntry> openPartitionSizeIterator(int partitionId)
+    {
+        SparkRangeFilter rangeFilter = sparkRangeFilter(partitionId);
+        return bridge().getPartitionSizeIterator(cqlTable(), partitioner(), 
sstables(partitionId, rangeFilter, Collections.emptyList()),
+                                                 rangeFilter, timeProvider(), 
stats(), executorService());
+    }
+
     /**
      * @return a TimeProvider that returns the time now in seconds. User can 
override with their own provider
      */
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/SSTable.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/SSTable.java
index 4ae1198..39cf693 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/SSTable.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/data/SSTable.java
@@ -77,6 +77,8 @@ public abstract class SSTable implements Serializable
         return Objects.requireNonNull(openInputStream(FileType.DATA), "Data.db 
SSTable file component must exist");
     }
 
+    public abstract long length(FileType fileType);
+
     public abstract boolean isMissing(FileType fileType);
 
     public void verify() throws IncompleteSSTableException
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
index b4c8e0e..822a8cf 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
@@ -19,7 +19,7 @@
 
 package org.apache.cassandra.spark.reader;
 
-public class EmptyStreamScanner implements StreamScanner
+public class EmptyStreamScanner implements StreamScanner<Rid>
 {
     public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner();
 
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexConsumer.java
similarity index 68%
copy from 
cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
copy to 
cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexConsumer.java
index b4c8e0e..c372bda 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexConsumer.java
@@ -19,29 +19,11 @@
 
 package org.apache.cassandra.spark.reader;
 
-public class EmptyStreamScanner implements StreamScanner
-{
-    public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner();
-
-    @Override
-    public Rid rid()
-    {
-        return null;
-    }
+import java.util.function.Consumer;
 
-    @Override
-    public boolean hasNext()
-    {
-        return false;
-    }
-
-    @Override
-    public void advanceToNextColumn()
-    {
-    }
+public interface IndexConsumer extends Consumer<IndexEntry>
+{
+    void onFailure(Throwable t);
 
-    @Override
-    public void close()
-    {
-    }
+    void onFinished(long runtimeNanos);
 }
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexEntry.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexEntry.java
new file mode 100644
index 0000000..b86a2ef
--- /dev/null
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/IndexEntry.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+
+public class IndexEntry
+{
+    public final ByteBuffer partitionKey;
+    public final BigInteger token;
+    public final long uncompressed;
+    public final long compressed;
+
+    public IndexEntry(ByteBuffer partitionKey,
+                      BigInteger token,
+                      long uncompressed,
+                      long compressed)
+    {
+        this.partitionKey = partitionKey;
+        this.token = token;
+        this.uncompressed = uncompressed;
+        this.compressed = compressed;
+    }
+
+    public ByteBuffer getPartitionKey()
+    {
+        return this.partitionKey;
+    }
+
+    public BigInteger getToken()
+    {
+        return this.token;
+    }
+
+    public long getCompressed()
+    {
+        return compressed;
+    }
+
+    public long getUncompressed()
+    {
+        return uncompressed;
+    }
+
+    @Override
+    public String toString()
+    {
+        return ByteBufferUtils.toHexString(this.partitionKey)
+               + ": " + uncompressed + " uncompressed bytes, " + compressed + 
" compressed bytes";
+    }
+}
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
index 2f58f19..2a4e975 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/StreamScanner.java
@@ -28,19 +28,19 @@ import java.io.IOException;
  * they belong to:
  * <p>
  * Cassandra:
- *     r1 | c1, c2, c3
- *     r2 | c4
- *     r3 | c5, c6, c7, c8
+ * r1 | c1, c2, c3
+ * r2 | c4
+ * r3 | c5, c6, c7, c8
  * <p>
  * Pivoted:
- *     r1 | c1
- *     r1 | c2
- *     r1 | c3
- *     r2 | c4
- *     r3 | c5
- *     r3 | c6
- *     r3 | c7
- *     r3 | c8
+ * r1 | c1
+ * r1 | c2
+ * r1 | c3
+ * r2 | c4
+ * r3 | c5
+ * r3 | c6
+ * r3 | c7
+ * r3 | c8
  * <p>
  * During a loading operation we will extract up to a few trillion items out 
of SSTables, so it is of
  * high importance to reuse objects - the caller to the scanner creates a rid 
using the
@@ -49,9 +49,10 @@ import java.io.IOException;
  * <p>
  * Upon return from the next() call the current values of the scanner can be 
obtained by calling
  * the methods in Rid, getPartitionKey(), getColumnName(), getValue().
+ * @param <Type> type of object returned by rid() method.
  */
 @SuppressWarnings("unused")
-public interface StreamScanner extends Closeable
+public interface StreamScanner<Type> extends Closeable
 {
     /**
      * Expose the data/rid to be consumed.
@@ -59,13 +60,13 @@ public interface StreamScanner extends Closeable
      *
      * @return rid
      */
-    Rid rid();
+    Type rid();
 
     /**
      * Indicate if there are more data/rid avaiable
      *
      * @return true when the rid is available to be consumed;
-     *         otherwise, return false to indicate the scanner has exhausted
+     * otherwise, return false to indicate the scanner has exhausted
      * @throws IOException
      */
     boolean hasNext() throws IOException;
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/AbstractCompressionMetadata.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/AbstractCompressionMetadata.java
index 985b827..4fa69d4 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/AbstractCompressionMetadata.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/AbstractCompressionMetadata.java
@@ -45,7 +45,7 @@ public abstract class AbstractCompressionMetadata
         return dataLength;
     }
 
-    private Chunk chunkAtIndex(int index)
+    public Chunk chunkAtIndex(int index)
     {
         long chunkOffset = chunkOffsets.get(index);
         long nextChunkOffset = (index + 1 == chunkOffsets.size) ? -1 : 
chunkOffsets.get(index + 1);
@@ -54,6 +54,15 @@ public abstract class AbstractCompressionMetadata
         return new Chunk(chunkOffset, (nextChunkOffset == -1) ? -1 : (int) 
(nextChunkOffset - chunkOffset - 4));
     }
 
+    /**
+     * @param position uncompressed position
+     * @return the compressed chunk index for an uncompressed position.
+     */
+    public int chunkIdx(long position)
+    {
+        return (int) (position / chunkLength());
+    }
+
     /**
      * Get a chunk of compressed data (offset, length) corresponding to given 
position
      *
@@ -64,7 +73,7 @@ public abstract class AbstractCompressionMetadata
     public Chunk chunkAtPosition(long position) throws IOException
     {
         // Position of the chunk
-        int index = (int) (position / chunkLength());
+        int index = chunkIdx(position);
 
         if (index >= chunkOffsets.size)
         {
@@ -82,7 +91,7 @@ public abstract class AbstractCompressionMetadata
         public final long offset;
         public int length;  // CHECKSTYLE IGNORE: Public mutable field
 
-        Chunk(long offset, int length)
+        public Chunk(long offset, int length)
         {
             this.offset = offset;
             this.length = length;
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IIndexReader.java
similarity index 64%
copy from 
cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
copy to 
cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IIndexReader.java
index b4c8e0e..6930655 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/EmptyStreamScanner.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IIndexReader.java
@@ -17,31 +17,10 @@
  * under the License.
  */
 
-package org.apache.cassandra.spark.reader;
+package org.apache.cassandra.spark.reader.common;
 
-public class EmptyStreamScanner implements StreamScanner
-{
-    public static final EmptyStreamScanner INSTANCE = new EmptyStreamScanner();
-
-    @Override
-    public Rid rid()
-    {
-        return null;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-        return false;
-    }
+import org.apache.cassandra.spark.reader.SparkSSTableReader;
 
-    @Override
-    public void advanceToNextColumn()
-    {
-    }
-
-    @Override
-    public void close()
-    {
-    }
+public interface IIndexReader extends SparkSSTableReader
+{
 }
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java
new file mode 100644
index 0000000..1103153
--- /dev/null
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/reader/common/IndexIterator.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader.common;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.data.SSTablesSupplier;
+import org.apache.cassandra.spark.reader.IndexConsumer;
+import org.apache.cassandra.spark.reader.IndexEntry;
+import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.stats.Stats;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Iterator for reading through IndexEntries for multiple Index.db files.
+ *
+ * @param <ReaderType>
+ */
+public class IndexIterator<ReaderType extends IIndexReader> implements 
StreamScanner<IndexEntry>, IndexConsumer
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexIterator.class);
+
+    private final AtomicInteger finished = new AtomicInteger(0);
+    private final AtomicReference<Throwable> failure = new AtomicReference<>();
+
+    private final Set<ReaderType> readers;
+    private final LinkedBlockingQueue<IndexEntry> queue = new 
LinkedBlockingQueue<>();
+    private final long startTimeNanos;
+    private final Stats stats;
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private IndexEntry curr = null;
+
+    public IndexIterator(@NotNull SSTablesSupplier ssTables,
+                         @NotNull Stats stats,
+                         @NotNull IndexReaderOpener<ReaderType> supplier)
+    {
+        this.startTimeNanos = System.nanoTime();
+        this.stats = stats;
+        this.readers = ssTables.openAll((ssTable, isRepairPrimary) -> 
supplier.openReader(ssTable, isRepairPrimary, this));
+        stats.openedIndexFiles(System.nanoTime() - startTimeNanos);
+    }
+
+    public interface IndexReaderOpener<ReaderType extends IIndexReader>
+    {
+        ReaderType openReader(SSTable ssTable, boolean isRepairPrimary, 
IndexConsumer consumer) throws IOException;
+    }
+
+    public void accept(IndexEntry wrapper)
+    {
+        if (closed.get())
+        {
+            return;
+        }
+
+        try
+        {
+            queue.put(wrapper);
+            stats.indexEntryConsumed();
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
+
+    public IndexEntry rid()
+    {
+        return Objects.requireNonNull(curr, "advanceToNextColumn() must be 
called before data()");
+    }
+
+    public boolean hasNext() throws IOException
+    {
+        return isRunning() && noFailures() && (hasPendingItems() || 
notFinished());
+    }
+
+    protected boolean isRunning()
+    {
+        return !closed.get();
+    }
+
+    protected boolean hasPendingItems()
+    {
+        return !queue.isEmpty();
+    }
+
+    protected boolean noFailures()
+    {
+        return !maybeFail();
+    }
+
+    protected boolean maybeFail()
+    {
+        Throwable t = failure.get();
+        if (t != null)
+        {
+            throw new RuntimeException(t);
+        }
+        return false;
+    }
+
+    private boolean notFinished()
+    {
+        return !isFinished();
+    }
+
+    private boolean isFinished()
+    {
+        return finished.get() == readers.size();
+    }
+
+    public void advanceToNextColumn()
+    {
+        if (closed.get())
+        {
+            throw new IllegalStateException("Iterator closed");
+        }
+
+        try
+        {
+            long startTimeNanos = System.nanoTime();
+            this.curr = queue.take();
+            stats.indexIteratorTimeBlocked(System.nanoTime() - startTimeNanos);
+        }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void close()
+    {
+        if (!this.closed.get() && this.closed.compareAndSet(false, true))
+        {
+            queue.clear();
+            stats.closedIndexIterator(System.nanoTime() - startTimeNanos);
+        }
+    }
+
+    public void onFailure(Throwable t)
+    {
+        LOGGER.warn("IndexReader failed with exception", t);
+        stats.indexReaderFailure(t);
+        if (failure.get() == null)
+        {
+            failure.compareAndSet(null, t);
+        }
+    }
+
+    public void onFinished(long runtimeNanos)
+    {
+        stats.indexReaderFinished(runtimeNanos);
+        finished.incrementAndGet();
+    }
+}
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
index 45e5400..7593bf8 100644
--- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
+++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.spark.cdc.IPartitionUpdateWrapper;
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.SSTable;
 import org.apache.cassandra.spark.data.SSTablesSupplier;
+import org.apache.cassandra.spark.reader.IndexEntry;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.spark.utils.streaming.SSTableSource;
@@ -664,4 +665,140 @@ public abstract class Stats
     public void droppedOldMutation(long maxTimestampMicros)
     {
     }
+
+    // PartitionSizeIterator stats
+
+    /**
+     * @param timeToOpenNanos time taken to open PartitionSizeIterator in nanos
+     */
+    public void openedPartitionSizeIterator(long timeToOpenNanos)
+    {
+
+    }
+
+    /**
+     * @param entry emitted single IndexEntry.
+     */
+    public void emitIndexEntry(IndexEntry entry)
+    {
+
+    }
+
+    /**
+     * @param timeNanos the time in nanos spent blocking waiting for next 
IndexEntry.
+     */
+    public void indexIteratorTimeBlocked(long timeNanos)
+    {
+
+    }
+
+    /**
+     * @param timeNanos time taken to for PartitionSizeIterator to run in 
nanos.
+     */
+    public void closedPartitionSizeIterator(long timeNanos)
+    {
+
+    }
+
+    /**
+     * @param timeToOpenNanos time taken to open Index.db files in nanos
+     */
+    public void openedIndexFiles(long timeToOpenNanos)
+    {
+
+    }
+
+    /**
+     * @param timeToOpenNanos time in nanos the IndexIterator was open for.
+     */
+    public void closedIndexIterator(long timeToOpenNanos)
+    {
+
+    }
+
+    /**
+     * An index reader closed with a failure.
+     *
+     * @param t throwable
+     */
+    public void indexReaderFailure(Throwable t)
+    {
+
+    }
+
+    /**
+     * An index reader closed successfully.
+     *
+     * @param runtimeNanos time in nanos the IndexReader was open for.
+     */
+    public void indexReaderFinished(long runtimeNanos)
+    {
+
+    }
+
+    /**
+     * IndexReader skipped out-of-range partition keys.
+     *
+     * @param skipAhead number of bytes skipped.
+     */
+    public void indexBytesSkipped(long skipAhead)
+    {
+
+    }
+
+    /**
+     * IndexReader read bytes.
+     *
+     * @param bytesRead number of bytes read.
+     */
+    public void indexBytesRead(long bytesRead)
+    {
+
+    }
+
+    /**
+     * When a single index entry is consumer.
+     */
+    public void indexEntryConsumed()
+    {
+
+    }
+
+    /**
+     * The Summary.db file was read to check start-end token range of 
associated Index.db file.
+     *
+     * @param timeNanos time taken in nanos.
+     */
+    public void indexSummaryFileRead(long timeNanos)
+    {
+
+    }
+
+    /**
+     * CompressionInfo.db file was read.
+     *
+     * @param timeNanos time taken in nanos.
+     */
+    public void indexCompressionFileRead(long timeNanos)
+    {
+
+    }
+
+    /**
+     * Index.db was fully read
+     *
+     * @param timeNanos time taken in nanos.
+     */
+    public void indexFileRead(long timeNanos)
+    {
+
+    }
+
+    /**
+     * When an Index.db file can be fully skipped because it does not overlap 
with token range.
+     */
+    public void indexFileSkipped()
+    {
+
+    }
 }
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
index 423ea5a..d7e42c4 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSSTable.java
@@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.utils.IOUtils;
 import org.jetbrains.annotations.Nullable;
 
 public final class TestSSTable extends SSTable
@@ -108,6 +109,11 @@ public final class TestSSTable extends SSTable
         }
     }
 
+    public long length(FileType fileType)
+    {
+        return IOUtils.size(FileType.resolveComponentFile(fileType, dataFile));
+    }
+
     public boolean isMissing(FileType fileType)
     {
         return FileType.resolveComponentFile(fileType, dataFile) == null;
diff --git 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
index f5022f4..3f738e0 100644
--- 
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
+++ 
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/utils/test/TestSchema.java
@@ -192,6 +192,8 @@ public final class TestSchema
     public final String keyspace;
     public final String table;
     public final String createStatement;
+    public final ReplicationFactor rf = new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
+                                                              
ImmutableMap.of("DC1", 3));
     public final String insertStatement;
     public final String updateStatement;
     public final String deleteStatement;
@@ -428,8 +430,7 @@ public final class TestSchema
         return new CqlTable(keyspace,
                             table,
                             createStatement,
-                            new 
ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
-                                                  ImmutableMap.of("DC1", 3)),
+                            rf,
                             allFields,
                             udts,
                             0);
diff --git a/cassandra-four-zero/build.gradle b/cassandra-four-zero/build.gradle
index 01bbe7c..cee96b7 100644
--- a/cassandra-four-zero/build.gradle
+++ b/cassandra-four-zero/build.gradle
@@ -40,6 +40,7 @@ project(':cassandra-four-zero') {
 
         testImplementation(project(':cassandra-bridge'))
 
+        testImplementation project(path: ':cassandra-analytics-core', 
configuration: 'testArtifacts')
         
testImplementation("org.junit.jupiter:junit-jupiter-api:${project.junitVersion}")
         
testImplementation("org.junit.jupiter:junit-jupiter-params:${project.junitVersion}")
         
testImplementation("org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}")
@@ -57,6 +58,10 @@ project(':cassandra-four-zero') {
         enabled = false
     }
 
+    test {
+        useJUnitPlatform()
+    }
+
     shadowJar {
         archiveFileName = 'four-zero.jar'  // Must match label in 
CassandraVersion (without extension)
         zip64 = true
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
index 9b50486..6dabed8 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/bridge/CassandraBridgeImplementation.java
@@ -133,8 +133,12 @@ import org.apache.cassandra.spark.data.types.VarChar;
 import org.apache.cassandra.spark.data.types.VarInt;
 import org.apache.cassandra.spark.reader.CdcScannerBuilder;
 import org.apache.cassandra.spark.reader.CompactionStreamScanner;
+import org.apache.cassandra.spark.reader.IndexEntry;
+import org.apache.cassandra.spark.reader.IndexReader;
+import org.apache.cassandra.spark.reader.Rid;
 import org.apache.cassandra.spark.reader.SchemaBuilder;
 import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.reader.common.IndexIterator;
 import org.apache.cassandra.spark.sparksql.filters.CdcOffsetFilter;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
@@ -257,19 +261,19 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
     }
 
     @Override
-    public StreamScanner getCdcScanner(int partitionId,
-                                       @NotNull CqlTable table,
-                                       @NotNull Partitioner partitioner,
-                                       @NotNull CommitLogProvider 
commitLogProvider,
-                                       @NotNull TableIdLookup tableIdLookup,
-                                       @NotNull Stats stats,
-                                       @Nullable SparkRangeFilter 
sparkRangeFilter,
-                                       @Nullable CdcOffsetFilter offset,
-                                       int minimumReplicasPerMutation,
-                                       @NotNull Watermarker watermarker,
-                                       @NotNull String jobId,
-                                       @NotNull ExecutorService 
executorService,
-                                       @NotNull TimeProvider timeProvider)
+    public StreamScanner<Rid> getCdcScanner(int partitionId,
+                                            @NotNull CqlTable table,
+                                            @NotNull Partitioner partitioner,
+                                            @NotNull CommitLogProvider 
commitLogProvider,
+                                            @NotNull TableIdLookup 
tableIdLookup,
+                                            @NotNull Stats stats,
+                                            @Nullable SparkRangeFilter 
sparkRangeFilter,
+                                            @Nullable CdcOffsetFilter offset,
+                                            int minimumReplicasPerMutation,
+                                            @NotNull Watermarker watermarker,
+                                            @NotNull String jobId,
+                                            @NotNull ExecutorService 
executorService,
+                                            @NotNull TimeProvider timeProvider)
     {
         // NOTE: Need to use SchemaBuilder to init keyspace if not already set 
in Cassandra schema instance
         UUID tableId = tableIdLookup.lookup(table.keyspace(), table.table());
@@ -300,16 +304,16 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
     }
 
     @Override
-    public StreamScanner getCompactionScanner(@NotNull CqlTable table,
-                                              @NotNull Partitioner partitioner,
-                                              @NotNull SSTablesSupplier 
ssTables,
-                                              @Nullable SparkRangeFilter 
sparkRangeFilter,
-                                              @NotNull 
Collection<PartitionKeyFilter> partitionKeyFilters,
-                                              @Nullable PruneColumnFilter 
columnFilter,
-                                              @NotNull TimeProvider 
timeProvider,
-                                              boolean readIndexOffset,
-                                              boolean useIncrementalRepair,
-                                              @NotNull Stats stats)
+    public StreamScanner<Rid> getCompactionScanner(@NotNull CqlTable table,
+                                                   @NotNull Partitioner 
partitioner,
+                                                   @NotNull SSTablesSupplier 
ssTables,
+                                                   @Nullable SparkRangeFilter 
sparkRangeFilter,
+                                                   @NotNull 
Collection<PartitionKeyFilter> partitionKeyFilters,
+                                                   @Nullable PruneColumnFilter 
columnFilter,
+                                                   @NotNull TimeProvider 
timeProvider,
+                                                   boolean readIndexOffset,
+                                                   boolean 
useIncrementalRepair,
+                                                   @NotNull Stats stats)
     {
         // NOTE: Need to use SchemaBuilder to init keyspace if not already set 
in Cassandra Schema instance
         SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner);
@@ -326,6 +330,20 @@ public class CassandraBridgeImplementation extends 
CassandraBridge
                                                                .build()));
     }
 
+    public StreamScanner<IndexEntry> getPartitionSizeIterator(@NotNull 
CqlTable table,
+                                                              @NotNull 
Partitioner partitioner,
+                                                              @NotNull 
SSTablesSupplier ssTables,
+                                                              @Nullable 
SparkRangeFilter rangeFilter,
+                                                              @NotNull 
TimeProvider timeProvider,
+                                                              @NotNull Stats 
stats,
+                                                              @NotNull 
ExecutorService executor)
+    {
+        //NOTE: need to use SchemaBuilder to init keyspace if not already set 
in C* Schema instance
+        SchemaBuilder schemaBuilder = new SchemaBuilder(table, partitioner);
+        TableMetadata metadata = schemaBuilder.tableMetaData();
+        return new IndexIterator<>(ssTables, stats, ((ssTable, 
isRepairPrimary, consumer) -> new IndexReader(ssTable, metadata, rangeFilter, 
stats, consumer)));
+    }
+
     @Override
     public CassandraVersion getVersion()
     {
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
index 9cae19f..3e9b72d 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java
@@ -54,7 +54,7 @@ import org.apache.cassandra.spark.utils.TimeProvider;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.jetbrains.annotations.NotNull;
 
-public abstract class AbstractStreamScanner implements StreamScanner, Closeable
+public abstract class AbstractStreamScanner implements StreamScanner<Rid>, 
Closeable
 {
     // All partitions in the SSTable
     private UnfilteredPartitionIterator allPartitions;
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java
index bd7d17f..a1d9112 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CdcScannerBuilder.java
@@ -239,7 +239,7 @@ public class CdcScannerBuilder
         return null;
     }
 
-    public StreamScanner build()
+    public StreamScanner<Rid> build()
     {
         // Block on futures to read all CommitLog mutations and pass over to 
SortedStreamScanner
         List<PartitionUpdateWrapper> updates = futures.values().stream()
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java
index e820180..1c89853 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressionMetadata.java
@@ -34,7 +34,9 @@ import org.apache.cassandra.spark.reader.common.BigLongArray;
 /**
  * Holds metadata about compressed file
  */
-final class CompressionMetadata extends AbstractCompressionMetadata
+// CompressionMetadata is mocked in IndexReaderTests and mockito does not 
support mocking final classes
+// CHECKSTYLE IGNORE: FinalClass
+class CompressionMetadata extends AbstractCompressionMetadata
 {
 
     private final CompressionParams parameters;
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
new file mode 100644
index 0000000..1c57d3a
--- /dev/null
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/IndexReader.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.spark.data.FileType;
+import org.apache.cassandra.spark.data.IncompleteSSTableException;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.reader.common.AbstractCompressionMetadata;
+import org.apache.cassandra.spark.reader.common.IIndexReader;
+import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.spark.stats.Stats;
+import org.apache.cassandra.spark.utils.ByteBufferUtils;
+import org.apache.cassandra.utils.vint.VIntCoding;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+public class IndexReader implements IIndexReader
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexReader.class);
+
+    private TokenRange ssTableRange = null;
+
+    public IndexReader(@NotNull SSTable ssTable,
+                       @NotNull TableMetadata metadata,
+                       @Nullable SparkRangeFilter rangeFilter,
+                       @NotNull Stats stats,
+                       @NotNull IndexConsumer consumer)
+    {
+        long now = System.nanoTime();
+        long startTimeNanos = now;
+        try
+        {
+            File file = SSTableReader.constructFilename(metadata.keyspace, 
metadata.name, ssTable.getDataFileName());
+            Descriptor descriptor = Descriptor.fromFilename(file);
+            Version version = descriptor.version;
+
+            // if there is a range filter we can use the Summary.db file to 
seek to approximate start token range location in Index.db file
+            long skipAhead = -1;
+            now = System.nanoTime();
+            if (rangeFilter != null)
+            {
+                SummaryDbUtils.Summary summary = 
SSTableCache.INSTANCE.keysFromSummary(metadata, ssTable);
+                if (summary != null)
+                {
+                    this.ssTableRange = 
TokenRange.closed(ReaderUtils.tokenToBigInteger(summary.first().getToken()),
+                                                          
ReaderUtils.tokenToBigInteger(summary.last().getToken()));
+                    if (!rangeFilter.overlaps(this.ssTableRange))
+                    {
+                        LOGGER.info("Skipping non-overlapping Index.db file 
rangeFilter='[{},{}]' sstableRange='[{},{}]'",
+                                    rangeFilter.tokenRange().lowerEndpoint(), 
rangeFilter.tokenRange().upperEndpoint(),
+                                    this.ssTableRange.lowerEndpoint(), 
this.ssTableRange.upperEndpoint());
+                        stats.indexFileSkipped();
+                        return;
+                    }
+
+                    skipAhead = summary.summary().getPosition(
+                    SummaryDbUtils.binarySearchSummary(summary.summary(), 
metadata.partitioner, rangeFilter.tokenRange().lowerEndpoint())
+                    );
+                    stats.indexSummaryFileRead(System.nanoTime() - now);
+                    now = System.nanoTime();
+                }
+            }
+
+            // read CompressionMetadata if it exists
+            CompressionMetadata compressionMetadata = 
getCompressionMetadata(ssTable, version.hasMaxCompressedLength());
+            if (compressionMetadata != null)
+            {
+                stats.indexCompressionFileRead(System.nanoTime() - now);
+                now = System.nanoTime();
+            }
+
+            // read through Index.db and consume Partition keys
+            try (InputStream is = ssTable.openPrimaryIndexStream())
+            {
+                if (is == null)
+                {
+                    consumer.onFailure(new 
IncompleteSSTableException(FileType.INDEX));
+                    return;
+                }
+
+                consumePrimaryIndex(metadata.partitioner,
+                                    is,
+                                    ssTable,
+                                    compressionMetadata,
+                                    rangeFilter,
+                                    stats,
+                                    skipAhead,
+                                    consumer);
+                stats.indexFileRead(System.nanoTime() - now);
+            }
+        }
+        catch (Throwable t)
+        {
+            consumer.onFailure(t);
+        }
+        finally
+        {
+            consumer.onFinished(System.nanoTime() - startTimeNanos);
+        }
+    }
+
+    @Nullable
+    public static CompressionMetadata getCompressionMetadata(SSTable ssTable,
+                                                             boolean 
hasMaxCompressedLength) throws IOException
+    {
+        try (InputStream cis = ssTable.openCompressionStream())
+        {
+            if (cis != null)
+            {
+                return CompressionMetadata.fromInputStream(cis, 
hasMaxCompressedLength);
+            }
+        }
+        return null;
+    }
+
+    @SuppressWarnings("InfiniteLoopStatement")
+    static void consumePrimaryIndex(@NotNull IPartitioner partitioner,
+                                    @NotNull InputStream primaryIndex,
+                                    @NotNull SSTable ssTable,
+                                    @Nullable CompressionMetadata 
compressionMetadata,
+                                    @Nullable SparkRangeFilter range,
+                                    @NotNull Stats stats,
+                                    long skipBytes,
+                                    @NotNull IndexConsumer consumer) throws 
IOException
+    {
+        long primaryIndexLength = ssTable.length(FileType.INDEX);
+        long dataDbFileLength = ssTable.length(FileType.DATA);
+        try (DataInputStream dis = new DataInputStream(primaryIndex))
+        {
+            if (skipBytes > 0)
+            {
+                ByteBufferUtils.skipFully(dis, skipBytes);
+                stats.indexBytesSkipped(skipBytes);
+            }
+
+            ByteBuffer prevKey = null;
+            long prevPos = 0;
+            BigInteger prevToken = null;
+            boolean started = false;
+
+            long totalBytesRead = 0;
+            try
+            {
+                while (true)
+                {
+                    // read partition key length
+                    int len = dis.readUnsignedShort();
+
+                    // read partition key & decorate
+                    byte[] buf = new byte[len];
+                    dis.readFully(buf);
+                    ByteBuffer key = ByteBuffer.wrap(buf);
+                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
+                    BigInteger token = 
ReaderUtils.tokenToBigInteger(decoratedKey.getToken());
+
+                    // read position & skip promoted index
+                    long pos = ReaderUtils.readPosition(dis);
+                    int promotedIndex = ReaderUtils.skipPromotedIndex(dis);
+                    totalBytesRead += 2 + len + 
VIntCoding.computeUnsignedVIntSize(pos) + promotedIndex;
+
+                    if (prevKey != null && (range == null || 
range.overlaps(prevToken)))
+                    {
+                        // previous key overlaps with range filter, so consume
+                        started = true;
+                        long uncompressed = pos - prevPos;
+                        long compressed = compressionMetadata == null
+                                                ? uncompressed
+                                                : 
calculateCompressedSize(compressionMetadata, dataDbFileLength, prevPos, pos - 
1);
+                        consumer.accept(new IndexEntry(prevKey, prevToken, 
uncompressed, compressed));
+                    }
+                    else if (started)
+                    {
+                        // we have gone passed the range we care about so exit 
early
+                        stats.indexBytesSkipped(primaryIndexLength - 
totalBytesRead - skipBytes);
+                        return;
+                    }
+
+                    prevPos = pos;
+                    prevKey = key;
+                    prevToken = token;
+                }
+            }
+            catch (EOFException ignored)
+            {
+                // finished
+            }
+            finally
+            {
+                stats.indexBytesRead(totalBytesRead);
+            }
+
+            if (prevKey != null && (range == null || 
range.overlaps(prevToken)))
+            {
+                // we reached the end of the file, so consume last key if 
overlaps
+                long end = (compressionMetadata == null ? dataDbFileLength : 
compressionMetadata.getDataLength());
+                long uncompressed = end - prevPos;
+                long compressed = compressionMetadata == null
+                                        ? uncompressed
+                                        : 
calculateCompressedSize(compressionMetadata, dataDbFileLength, prevPos, end - 
1);
+                consumer.accept(new IndexEntry(prevKey, prevToken, 
uncompressed, compressed));
+            }
+        }
+    }
+
+    /**
+     * @param compressionMetadata  SSTable Compression Metadata
+     * @param compressedDataLength full compressed length of the Data.db file
+     * @param start                uncompressed start position.
+     * @param end                  uncompressed end position.
+     * @return the compressed size of a partition using the uncompressed start 
and end offset in the Data.db file to calculate.
+     */
+    public static long calculateCompressedSize(@NotNull CompressionMetadata 
compressionMetadata,
+                                               long compressedDataLength,
+                                               long start,
+                                               long end)
+    {
+        int startIdx = compressionMetadata.chunkIdx(start);
+        int endIdx = compressionMetadata.chunkIdx(end);
+        AbstractCompressionMetadata.Chunk startChunk = 
compressionMetadata.chunkAtIndex(startIdx);
+        long startLen = chunkCompressedLength(startChunk, 
compressedDataLength);
+        // compressed chunk sizes vary, but uncompressed chunk length is the 
same for all chunks
+        long uncompressedChunkLen = compressionMetadata.chunkLength();
+
+        if (startIdx == endIdx)
+        {
+            // within the same chunk, so take % of uncompressed length and 
apply to compressed length
+            float perc = (end - start) / (float) uncompressedChunkLen;
+            return Math.round(perc * startLen);
+        }
+
+        long size = partialCompressedSizeWithinChunk(start, 
uncompressedChunkLen, startLen, true);
+        AbstractCompressionMetadata.Chunk endChunk = 
compressionMetadata.chunkAtIndex(endIdx);
+        long endLen = chunkCompressedLength(endChunk, compressedDataLength);
+
+        size += partialCompressedSizeWithinChunk(end, uncompressedChunkLen, 
endLen, false);
+
+        for (int idx = startIdx + 1; idx < endIdx; idx++)
+        {
+            // add compressed size of whole intermediate chunks
+            size += 
chunkCompressedLength(compressionMetadata.chunkAtIndex(idx), 
compressedDataLength);
+        }
+
+        return size;
+    }
+
+    private static long 
chunkCompressedLength(AbstractCompressionMetadata.Chunk chunk, long 
compressedDataLength)
+    {
+        // chunk.length < 0 means it is the last chunk so use 
compressedDataLength to calculate compressed size
+        return chunk.length >= 0 ? chunk.length : compressedDataLength - 
chunk.offset;
+    }
+
+    /**
+     * Returns the partial compressed size of a partition whose start or end 
overlaps with a compressed chunk.
+     * This is an estimate because of the variable compressibility of 
partitions within the chunk.
+     *
+     * @param uncompressedPos      uncompressed position in Data.db file
+     * @param uncompressedChunkLen fixed size uncompressed chunk size
+     * @param compressedChunkLen   compressed chunk size of this chunk
+     * @param start                true if uncompressedPos is start position 
of partition and false if end position of partition
+     * @return the estimated compressed size of partition start or end that 
overlaps with this chunk.
+     */
+    public static int partialCompressedSizeWithinChunk(long uncompressedPos,
+                                                       long 
uncompressedChunkLen,
+                                                       long compressedChunkLen,
+                                                       boolean start)
+    {
+        long mod = uncompressedPos % uncompressedChunkLen;
+        // if start position then it occupies remaining bytes to end of chunk, 
if end position it occupies bytes from start of chunk
+        long usedBytes = start ? (uncompressedChunkLen - mod) : mod;
+        // percentage of uncompressed bytes that it occupies in the chunk
+        float perc = usedBytes / (float) uncompressedChunkLen;
+        // apply percentage to compressed chunk length to give compressed 
bytes occupied
+        return Math.round(perc * compressedChunkLen);
+    }
+
+    public BigInteger firstToken()
+    {
+        return ssTableRange != null ? ssTableRange.lowerEndpoint() : null;
+    }
+
+    public BigInteger lastToken()
+    {
+        return ssTableRange != null ? ssTableRange.upperEndpoint() : null;
+    }
+
+    public boolean ignore()
+    {
+        return false;
+    }
+}
diff --git 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
index 35c29b5..e7bbe81 100644
--- 
a/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
+++ 
b/cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java
@@ -498,13 +498,18 @@ public final class ReaderUtils
         return VIntCoding.readUnsignedVInt(dis);
     }
 
-    static void skipPromotedIndex(DataInputStream dis) throws IOException
+    /**
+     * @return the total bytes skipped
+     */
+    public static int skipPromotedIndex(DataInputStream dis) throws IOException
     {
-        int size = (int) VIntCoding.readUnsignedVInt(dis);
+        final long val = VIntCoding.readUnsignedVInt(dis);
+        final int size = (int) val;
         if (size > 0)
         {
             ByteBufferUtils.skipBytesFully(dis, size);
         }
+        return Math.max(size, 0) + VIntCoding.computeUnsignedVIntSize(val);
     }
 
     static List<PartitionKeyFilter> filterKeyInBloomFilter(
diff --git 
a/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/IndexReaderTests.java
 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/IndexReaderTests.java
new file mode 100644
index 0000000..b17b13b
--- /dev/null
+++ 
b/cassandra-four-zero/src/test/java/org/apache/cassandra/spark/reader/IndexReaderTests.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.reader;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.bridge.CassandraBridgeImplementation;
+import org.apache.cassandra.bridge.TokenRange;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.serializers.Int32Serializer;
+import org.apache.cassandra.spark.TestDataLayer;
+import org.apache.cassandra.spark.TestUtils;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.FileType;
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.reader.common.AbstractCompressionMetadata;
+import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
+import org.apache.cassandra.spark.stats.Stats;
+import org.apache.cassandra.spark.utils.TemporaryDirectory;
+import org.apache.cassandra.spark.utils.test.TestSchema;
+
+import static org.apache.cassandra.spark.TestUtils.getFileType;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.arbitrary;
+
+@SuppressWarnings("SameParameterValue")
+public class IndexReaderTests
+{
+    static final ExecutorService EXECUTOR =
+    Executors.newFixedThreadPool(4, new 
ThreadFactoryBuilder().setNameFormat("index-reader-tests-%d")
+                                                              .setDaemon(true)
+                                                              .build());
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IndexReaderTests.class);
+    private static final CassandraBridgeImplementation BRIDGE = new 
CassandraBridgeImplementation();
+
+    @Test
+    public void testPartialCompressedSizeWithinChunk()
+    {
+        assertEquals(64, IndexReader.partialCompressedSizeWithinChunk(0, 1024, 
64, true));
+        assertEquals(32, IndexReader.partialCompressedSizeWithinChunk(512, 
1024, 64, true));
+        assertEquals(16, IndexReader.partialCompressedSizeWithinChunk(768, 
1024, 64, true));
+        assertEquals(2, IndexReader.partialCompressedSizeWithinChunk(992, 
1024, 64, true));
+        assertEquals(2, IndexReader.partialCompressedSizeWithinChunk(995, 
1024, 64, true));
+        assertEquals(1, IndexReader.partialCompressedSizeWithinChunk(1008, 
1024, 64, true));
+        assertEquals(0, IndexReader.partialCompressedSizeWithinChunk(1023, 
1024, 64, true));
+        assertEquals(64, IndexReader.partialCompressedSizeWithinChunk(1024, 
1024, 64, true));
+        assertEquals(64, IndexReader.partialCompressedSizeWithinChunk(2048, 
1024, 64, true));
+        assertEquals(32, IndexReader.partialCompressedSizeWithinChunk(2560, 
1024, 64, true));
+
+        assertEquals(0, IndexReader.partialCompressedSizeWithinChunk(0, 1024, 
64, false));
+        assertEquals(1, IndexReader.partialCompressedSizeWithinChunk(16, 1024, 
64, false));
+        assertEquals(32, IndexReader.partialCompressedSizeWithinChunk(512, 
1024, 64, false));
+        assertEquals(64, IndexReader.partialCompressedSizeWithinChunk(1023, 
1024, 64, false));
+        assertEquals(32, IndexReader.partialCompressedSizeWithinChunk(2560, 
1024, 64, false));
+    }
+
+    @Test
+    public void testCompressedSizeWithinSameChunk()
+    {
+        // within the same chunk
+        assertEquals(64, calculateCompressedSize(128, 256, 0, 512));
+        assertEquals(128, calculateCompressedSize(0, 1024, 5, 128));
+        assertEquals(25, calculateCompressedSize(32, 64, 5, 800));
+    }
+
+    @Test
+    public void testCompressedSizeMultipleChunks()
+    {
+        // partition straddles more than one chunk
+        assertEquals(448 + 128, calculateCompressedSize(128, 0, 512, 1536, 1, 
256));
+        assertEquals(112 + (256 * 10) + 32, calculateCompressedSize(128, 0, 
128, 11392, 11, 256));
+    }
+
+    private static long calculateCompressedSize(long start, long end, int 
startIdx, int startCompressedChunkSize)
+    {
+        return IndexReader.calculateCompressedSize(mockMetaData(start, 
startIdx, startCompressedChunkSize, end), 160000000, start, end);
+    }
+
+    private static long calculateCompressedSize(long start, int startIdx, int 
startCompressedChunkSize,
+                                                long end, int endIdx, int 
endCompressedChunkSize)
+    {
+        return IndexReader.calculateCompressedSize(
+        mockMetaData(start, startIdx, startCompressedChunkSize, end, endIdx, 
endCompressedChunkSize), 160000000, start, end
+        );
+    }
+
+    private static CompressionMetadata mockMetaData(long start, int startIdx, 
int startCompressedChunkSize, long end)
+    {
+        return mockMetaData(start, startIdx, startCompressedChunkSize, end, 
startIdx, startCompressedChunkSize);
+    }
+
+    private static CompressionMetadata mockMetaData(long start, int startIdx, 
int startCompressedChunkSize,
+                                                    long end, int endIdx, int 
endCompressedChunkSize)
+    {
+        return mockMetaData(start, startIdx, startCompressedChunkSize, end, 
endIdx, endCompressedChunkSize, 1024);
+    }
+
+    private static CompressionMetadata mockMetaData(long start, int startIdx, 
int startCompressedChunkSize,
+                                                    long end, int endIdx, int 
endCompressedChunkSize,
+                                                    int 
uncompressedChunkLength)
+    {
+        CompressionMetadata metadata = mock(CompressionMetadata.class);
+        when(metadata.chunkIdx(eq(start))).thenReturn(startIdx);
+        when(metadata.chunkIdx(eq(end))).thenReturn(endIdx);
+        when(metadata.chunkLength()).thenReturn(uncompressedChunkLength);
+        when(metadata.chunkAtIndex(eq(startIdx))).thenReturn(new 
AbstractCompressionMetadata.Chunk(0, startCompressedChunkSize));
+        when(metadata.chunkAtIndex(eq(endIdx))).thenReturn(new 
AbstractCompressionMetadata.Chunk(0, endCompressedChunkSize));
+        for (int idx = startIdx + 1; idx < endIdx; idx++)
+        {
+            // let intermediate chunks have same compressed size as end
+            when(metadata.chunkAtIndex(eq(idx))).thenReturn(new 
AbstractCompressionMetadata.Chunk(0, endCompressedChunkSize));
+        }
+        return metadata;
+    }
+
+    @Test
+    public void testIndexReaderWithCompression()
+    {
+        testIndexReader(true);
+    }
+
+    @Test
+    public void testIndexReaderWithoutCompression()
+    {
+        testIndexReader(false);
+    }
+
+    private static void testIndexReader(boolean withCompression)
+    {
+        qt().forAll(arbitrary().enumValues(Partitioner.class))
+            .checkAssert(partitioner -> {
+                try (TemporaryDirectory directory = new TemporaryDirectory())
+                {
+                    Path dir = directory.path();
+                    int numPartitions = 50000;
+                    BigInteger eighth = 
partitioner.maxToken().divide(BigInteger.valueOf(8));
+                    SparkRangeFilter rangeFilter = SparkRangeFilter.create(
+                    TokenRange.closed(partitioner.minToken().add(eighth),
+                                      partitioner.maxToken().subtract(eighth))
+                    );
+                    TestSchema schema = TestSchema.builder()
+                                                        .withPartitionKey("a", 
BRIDGE.aInt())
+                                                        .withColumn("b", 
BRIDGE.blob())
+                                                        
.withCompression(withCompression)
+                                                        .build();
+                    CqlTable table = schema.buildTable();
+                    TableMetadata metaData = new 
SchemaBuilder(schema.createStatement, schema.keyspace, schema.rf, 
partitioner).tableMetaData();
+
+                    // write an SSTable
+                    Map<Integer, Integer> expected = new HashMap<>();
+                    schema.writeSSTable(dir, BRIDGE, partitioner, (writer) -> {
+                        for (int i = 0; i < numPartitions; i++)
+                        {
+                            BigInteger token = ReaderUtils.tokenToBigInteger(
+                            
metaData.partitioner.decorateKey(Int32Serializer.instance.serialize(i)).getToken()
+                            );
+                            byte[] lowEntropyData = 
TestUtils.randomLowEntropyData();
+                            if (rangeFilter.overlaps(token))
+                            {
+                                expected.put(i, lowEntropyData.length);
+                            }
+                            writer.write(i, ByteBuffer.wrap(lowEntropyData));
+                        }
+                    });
+                    assertFalse(expected.isEmpty());
+                    assertTrue(expected.size() < numPartitions);
+
+                    TestDataLayer dataLayer = new TestDataLayer(BRIDGE, 
getFileType(dir, FileType.DATA).collect(Collectors.toList()), table);
+                    List<SSTable> ssTables = 
dataLayer.listSSTables().collect(Collectors.toList());
+                    assertFalse(ssTables.isEmpty());
+                    AtomicReference<Throwable> error = new AtomicReference<>();
+                    CountDownLatch latch = new CountDownLatch(ssTables.size());
+                    AtomicInteger rowCount = new AtomicInteger(0);
+
+                    IndexConsumer consumer = new IndexConsumer()
+                    {
+                        public void onFailure(Throwable t)
+                        {
+                            LOGGER.warn("Error reading index file", t);
+                            if (error.get() == null)
+                            {
+                                error.compareAndSet(null, t);
+                            }
+                        }
+
+                        public void onFinished(long runtimeNanos)
+                        {
+                            latch.countDown();
+                        }
+
+                        public void accept(IndexEntry indexEntry)
+                        {
+                            // we should only read in-range partition keys
+                            rowCount.getAndIncrement();
+                            int pk = indexEntry.partitionKey.getInt();
+                            int blobSize = expected.get(pk);
+                            assertTrue(expected.containsKey(pk));
+                            assertTrue(indexEntry.compressed > 0);
+                            assertTrue(withCompression
+                                       ? indexEntry.compressed < 
indexEntry.uncompressed * 0.1
+                                       : indexEntry.compressed == 
indexEntry.uncompressed);
+                            assertTrue((int) indexEntry.uncompressed > 
blobSize);
+                            // uncompressed size should be proportional to the 
blob size, with some serialization overhead
+                            assertTrue(((int) indexEntry.uncompressed - 
blobSize) < 40);
+                        }
+                    };
+
+                    ssTables
+                    .forEach(ssTable -> CompletableFuture.runAsync(
+                             () -> new IndexReader(ssTable, metaData, 
rangeFilter, Stats.DoNothingStats.INSTANCE, consumer), EXECUTOR)
+                    );
+
+                    try
+                    {
+                        latch.await();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    assertNull(error.get());
+                    assertEquals(expected.size(), rowCount.get());
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            });
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to