This is an automated email from the ASF dual-hosted git repository.

lukasz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e5bfe221 CASSANALYTICS-121: Assign data file start offset based on BTI 
index
e5bfe221 is described below

commit e5bfe221dfa58c6e4882485468d1a84e6e0f3d68
Author: Lukasz Antoniak <[email protected]>
AuthorDate: Mon Feb 16 11:45:34 2026 +0100

    CASSANALYTICS-121: Assign data file start offset based on BTI index
    
    Patched by Lukasz Antoniak; Reviewed by Yifan Cai for CASSANALYTICS-121
---
 CHANGES.txt                                        |  1 +
 .../io/sstable/format/bti/BtiReaderUtils.java      | 64 +++++++++++++++++++++-
 .../cassandra/spark/reader/SSTableReader.java      | 24 ++++++--
 .../cassandra/spark/reader/IndexOffsetTests.java   | 15 ++++-
 .../cassandra/spark/reader/IndexOffsetTests.java   | 15 ++++-
 .../org/apache/cassandra/utils/TokenUtils.java     | 15 +++++
 6 files changed, 121 insertions(+), 13 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7588c8b8..6b973c39 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Assign data file start offset based on BTI index (CASSANALYTICS-121)
  * Quote identifiers option must be set to true if ttl has mixed case column 
name (CASSANALYTICS-120)
  * Fix ByteBuffer flip() in StreamBuffer.copyBytes() causing data corruption 
(CASSANALYTICS-116)
  * Fix race condition in DirectStreamSession#onSSTablesProduced and 
SortedSStableWriter#close (CASSANALYTICS-107)
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java
index 367a41b7..444610bf 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/io/sstable/format/bti/BtiReaderUtils.java
@@ -24,19 +24,28 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 import com.google.common.collect.ImmutableSet;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.bridge.TokenRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.io.util.FileHandle;
@@ -53,6 +62,7 @@ import 
org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
 import org.apache.cassandra.spark.utils.streaming.BufferingInputStream;
 import org.apache.cassandra.utils.FilterFactory;
+import org.apache.cassandra.utils.TokenUtils;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -60,6 +70,8 @@ import static 
org.apache.cassandra.spark.reader.BigIndexReader.calculateCompress
 
 public class BtiReaderUtils
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BtiReaderUtils.class);
+
     private static final Set<Component> indexComponents = 
ImmutableSet.of(BtiFormat.Components.DATA,
                                                                           
BtiFormat.Components.PARTITION_INDEX,
                                                                           
BtiFormat.Components.ROW_INDEX);
@@ -109,6 +121,53 @@ public class BtiReaderUtils
         return exists.get();
     }
 
+    @Nullable
+    public static Long startOffsetInDataFile(@NotNull SSTable ssTable,
+                                             @NotNull TableMetadata metadata,
+                                             @NotNull Descriptor descriptor,
+                                             @NotNull TokenRange tokenRange)
+    {
+        final AtomicReference<Long> offset = new AtomicReference<>(null);
+
+        Token tokenStart = TokenUtils.bigIntegerToToken(metadata.partitioner, 
tokenRange.lowerEndpoint());
+        Token tokenEnd = TokenUtils.bigIntegerToToken(metadata.partitioner, 
tokenRange.upperEndpoint());
+        Range<Token> range = new Range<>(tokenStart, tokenEnd);
+
+        try
+        {
+            withPartitionIndex(ssTable, descriptor, metadata, true, false, 
(dataFileHandle, partitionFileHandle, rowFileHandle, partitionIndex) -> {
+                TableMetadataRef metadataRef = 
TableMetadataRef.forOfflineTools(metadata);
+                BtiTableReader btiTableReader = new 
BtiTableReader.Builder(descriptor)
+                                                .setDataFile(dataFileHandle)
+                                                
.setPartitionIndex(partitionIndex)
+                                                .setComponents(indexComponents)
+                                                
.setTableMetadataRef(metadataRef)
+                                                
.setFilter(FilterFactory.AlwaysPresent)
+                                                .build(null, false, false);
+                try
+                {
+                    List<SSTableReader.PartitionPositionBounds> positions =
+                            
btiTableReader.getPositionsForRanges(Collections.singletonList(range));
+                    if (!positions.isEmpty())
+                    {
+                        // we should receive zero or one position
+                        offset.set(positions.get(0).lowerPosition);
+                    }
+                }
+                finally
+                {
+                    btiTableReader.selfRef().release();
+                }
+            });
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Failed to lookup start offset for token range {} in 
sstable {}",
+                        tokenRange, ssTable, e);
+        }
+        return offset.get();
+    }
+
     public static void consumePrimaryIndex(@NotNull SSTable ssTable,
                                            @NotNull TableMetadata metadata,
                                            @NotNull Descriptor descriptor,
@@ -240,9 +299,8 @@ public class BtiReaderUtils
                                            @NotNull BtiPartitionIndexConsumer 
consumer) throws IOException
     {
         File file = new File(sstable.getDataFileName());
-        CompressionMetadata compression = getCompressionMetadata(sstable, 
crcCheckChance, descriptor);
-
-        try (FileHandle dataFileHandle = loadDataFile ? createFileHandle(file,
+        try (CompressionMetadata compression = getCompressionMetadata(sstable, 
crcCheckChance, descriptor);
+             FileHandle dataFileHandle = loadDataFile ? createFileHandle(file,
                                                                          
sstable.openDataStream(),
                                                                          
sstable.length(FileType.DATA),
                                                                          
compression) : null;
diff --git 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
index 3dbe44d1..cd8be1f0 100644
--- 
a/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
+++ 
b/cassandra-five-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/SSTableReader.java
@@ -65,6 +65,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.io.sstable.SSTableSimpleIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.bti.BtiReaderUtils;
 import org.apache.cassandra.io.sstable.indexsummary.IndexSummary;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
@@ -429,11 +430,26 @@ public class SSTableReader implements SparkSSTableReader, 
Scannable
                                                 buildColumnFilter(metadata, 
columnFilter));
         this.metadata = metadata;
 
-        if (readIndexOffset && summary != null)
+        if (readIndexOffset)
         {
-            SummaryDbUtils.Summary finalSummary = summary;
-            extractRange(sparkRangeFilter, partitionKeyFilters)
-                    .ifPresent(range -> readOffsets(finalSummary.summary(), 
range));
+            if (summary != null)
+            {
+                // BIG format
+                SummaryDbUtils.Summary finalSummary = summary;
+                extractRange(sparkRangeFilter, partitionKeyFilters)
+                        .ifPresent(range -> 
readOffsets(finalSummary.summary(), range));
+            }
+            else
+            {
+                // BTI format
+                extractRange(sparkRangeFilter, partitionKeyFilters)
+                        .ifPresent(range -> {
+                            startOffset = 
BtiReaderUtils.startOffsetInDataFile(ssTable,
+                                                                               
this.metadata,
+                                                                               
descriptor,
+                                                                               
range);
+                        });
+            }
         }
         else
         {
diff --git 
a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
 
b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
index b2b7751f..316a9a1c 100644
--- 
a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
+++ 
b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +111,8 @@ public class IndexOffsetTests
                     LOGGER.info("Testing index offsets numKeys={} 
sparkPartitions={} partitioner={} enableCompression={}",
                                 numKeys, ranges.size(), partitioner.name(), 
enableCompression);
 
-                    MutableInt skipped = new MutableInt(0);
+                    MutableInt skippedPartitions = new MutableInt(0);
+                    MutableLong skippedDataOffsets = new MutableLong(0);
                     int[] counts = new int[numKeys];
                     for (TokenRange range : ranges)
                     {
@@ -120,7 +122,12 @@ public class IndexOffsetTests
                                                             {
                                                                 public void 
skippedPartition(ByteBuffer key, BigInteger token)
                                                                 {
-                                                                    
skipped.add(1);
+                                                                    
skippedPartitions.add(1);
+                                                                }
+
+                                                                public void 
skippedDataDbStartOffset(long length)
+                                                                {
+                                                                    
skippedDataOffsets.add(length);
                                                                 }
                                                             })
                                                             .build();
@@ -180,8 +187,10 @@ public class IndexOffsetTests
                         index++;
                     }
 
+                    
assertThat(skippedDataOffsets.longValue()).isGreaterThan(0);
+
                     LOGGER.info("Success skippedKeys={} partitioner={}",
-                                skipped.intValue(), partitioner.name());
+                                skippedPartitions.intValue(), 
partitioner.name());
                 }
                 catch (IOException exception)
                 {
diff --git 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
index b2b7751f..316a9a1c 100644
--- 
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
+++ 
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/IndexOffsetTests.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import com.google.common.collect.ImmutableMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang.mutable.MutableLong;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -110,7 +111,8 @@ public class IndexOffsetTests
                     LOGGER.info("Testing index offsets numKeys={} 
sparkPartitions={} partitioner={} enableCompression={}",
                                 numKeys, ranges.size(), partitioner.name(), 
enableCompression);
 
-                    MutableInt skipped = new MutableInt(0);
+                    MutableInt skippedPartitions = new MutableInt(0);
+                    MutableLong skippedDataOffsets = new MutableLong(0);
                     int[] counts = new int[numKeys];
                     for (TokenRange range : ranges)
                     {
@@ -120,7 +122,12 @@ public class IndexOffsetTests
                                                             {
                                                                 public void 
skippedPartition(ByteBuffer key, BigInteger token)
                                                                 {
-                                                                    
skipped.add(1);
+                                                                    
skippedPartitions.add(1);
+                                                                }
+
+                                                                public void 
skippedDataDbStartOffset(long length)
+                                                                {
+                                                                    
skippedDataOffsets.add(length);
                                                                 }
                                                             })
                                                             .build();
@@ -180,8 +187,10 @@ public class IndexOffsetTests
                         index++;
                     }
 
+                    
assertThat(skippedDataOffsets.longValue()).isGreaterThan(0);
+
                     LOGGER.info("Success skippedKeys={} partitioner={}",
-                                skipped.intValue(), partitioner.name());
+                                skippedPartitions.intValue(), 
partitioner.name());
                 }
                 catch (IOException exception)
                 {
diff --git 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java
 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java
index beef1aa8..da84bca4 100644
--- 
a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java
+++ 
b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/utils/TokenUtils.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.utils;
 
 import java.math.BigInteger;
 
+import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
@@ -46,6 +47,20 @@ public class TokenUtils
         throw new UnsupportedOperationException("Unexpected token type: " + 
token.getClass().getName());
     }
 
+    public static Token bigIntegerToToken(IPartitioner partitioner, BigInteger 
token)
+    {
+        if (partitioner instanceof Murmur3Partitioner)
+        {
+            return new Murmur3Partitioner.LongToken(token.longValue());
+        }
+        if (partitioner instanceof RandomPartitioner)
+        {
+            return new RandomPartitioner.BigIntegerToken(token);
+        }
+
+        throw new UnsupportedOperationException("Unexpected partitioner type: 
" + partitioner.getClass().getName());
+    }
+
     public static long tokenToLong(final Token token)
     {
         if (token instanceof Murmur3Partitioner.LongToken)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to