This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new 7a8335c273 Optimize initial skipping logic for SAI queries on large
partitions
7a8335c273 is described below
commit 7a8335c2739c207b77e90c05897285b3cbaba166
Author: Sunil Ramchandra Pawar <[email protected]>
AuthorDate: Thu May 8 17:12:18 2025 +0530
Optimize initial skipping logic for SAI queries on large partitions
patch by Sunil Ramchandra Pawar; reviewed by Caleb Rackliffe and David
Capwell for CASSANDRA-20191
---
CHANGES.txt | 1 +
.../sai/plan/StorageAttachedIndexSearcher.java | 58 +++-
.../index/sai/cql/IntraPartitionSkippingTest.java | 318 +++++++++++++++++++++
3 files changed, 375 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a6dbe7ae5..c073719105 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0.5
+ * Optimize initial skipping logic for SAI queries on large partitions
(CASSANDRA-20191)
* Fix reading mmapped trie-index exceeding 2GiB (CASSANDRA-20351)
* zero copy streaming allocates direct memory that isn't used, but does help
to fragment the memory space (CASSANDRA-20577)
* CQLSSTableWriter supports setting the format (BTI or Big) (CASSANDRA-20609)
diff --git
a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
index 9116db0d31..20a9cad58c 100644
---
a/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
+++
b/src/java/org/apache/cassandra/index/sai/plan/StorageAttachedIndexSearcher.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.index.sai.plan;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -32,6 +33,8 @@ import javax.annotation.Nullable;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringBound;
+import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
@@ -39,6 +42,10 @@ import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.Slices;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -138,6 +145,7 @@ public class StorageAttachedIndexSearcher implements
Index.Searcher
private final PrimaryKey firstPrimaryKey;
private final PrimaryKey lastPrimaryKey;
private final Iterator<DataRange> keyRanges;
+ private final DataRange firstDataRange;
private AbstractBounds<PartitionPosition> currentKeyRange;
private final KeyRangeIterator resultKeyIterator;
@@ -152,7 +160,8 @@ public class StorageAttachedIndexSearcher implements
Index.Searcher
private ResultRetriever(ReadExecutionController executionController,
boolean topK)
{
this.keyRanges = queryController.dataRanges().iterator();
- this.currentKeyRange = keyRanges.next().keyRange();
+ this.firstDataRange = keyRanges.next();
+ this.currentKeyRange = firstDataRange.keyRange();
this.resultKeyIterator = Operation.buildIterator(queryController);
this.filterTree = Operation.buildFilter(queryController,
queryController.usesStrictFiltering());
this.executionController = executionController;
@@ -175,7 +184,52 @@ public class StorageAttachedIndexSearcher implements
Index.Searcher
// We can't put this code in the constructor because it may throw
and the caller
// may not be prepared for that.
if (lastKey == null)
- resultKeyIterator.skipTo(firstPrimaryKey);
+ {
+ PrimaryKey skipTarget = firstPrimaryKey;
+ ClusteringComparator comparator =
command.metadata().comparator;
+
+ // If there are no clusterings, the first data range selects
an entire partitions, or we have static
+ // expressions, don't bother trying to skip forward within the
partition.
+ if (comparator.size() > 0 &&
!firstDataRange.selectsAllPartition() &&
!command.rowFilter().hasStaticExpression())
+ {
+ // Only attempt to skip if the first data range covers a
single partition.
+ if (currentKeyRange.left.equals(currentKeyRange.right) &&
currentKeyRange.left instanceof DecoratedKey)
+ {
+ DecoratedKey decoratedKey = (DecoratedKey)
currentKeyRange.left;
+ ClusteringIndexFilter filter =
firstDataRange.clusteringIndexFilter(decoratedKey);
+
+ if (filter instanceof ClusteringIndexSliceFilter)
+ {
+ Slices slices = ((ClusteringIndexSliceFilter)
filter).requestedSlices();
+
+ if (!slices.isEmpty())
+ {
+ ClusteringBound<?> startBound =
slices.get(0).start();
+
+ if (!startBound.isEmpty())
+ {
+ ByteBuffer[] rawValues =
startBound.getBufferArray();
+
+ if (rawValues.length == comparator.size())
+ skipTarget =
keyFactory.create(decoratedKey, Clustering.make(rawValues));
+ }
+ }
+ }
+ else if (filter instanceof ClusteringIndexNamesFilter)
+ {
+ ClusteringIndexNamesFilter namesFilter =
(ClusteringIndexNamesFilter) filter;
+
+ if (!namesFilter.requestedRows().isEmpty())
+ {
+ Clustering<?> skipClustering =
namesFilter.requestedRows().iterator().next();
+ skipTarget = keyFactory.create(decoratedKey,
skipClustering);
+ }
+ }
+ }
+ }
+
+ resultKeyIterator.skipTo(skipTarget);
+ }
// Theoretically we wouldn't need this if the caller of
computeNext always ran the
// returned iterators to the completion. Unfortunately, we have no
control over the caller behavior here.
diff --git
a/test/unit/org/apache/cassandra/index/sai/cql/IntraPartitionSkippingTest.java
b/test/unit/org/apache/cassandra/index/sai/cql/IntraPartitionSkippingTest.java
new file mode 100644
index 0000000000..b9e42640e7
--- /dev/null
+++
b/test/unit/org/apache/cassandra/index/sai/cql/IntraPartitionSkippingTest.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.sai.cql;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.HdrHistogram.Histogram;
+import org.apache.cassandra.index.sai.SAITester;
+
+/**
+ * Tests for verifying intra-partition and partition-level skipping
optimizations
+ * introduced in CASSANDRA-20191 for SAI.
+ * <p>
+ * These tests validate that Cassandra can efficiently skip over rows
+ * within a partition using clustering filters (name and slice), paging,
reversed order,
+ * and sparse matches.
+ * <p>
+ * Each test documents a scenario where skipping logic is expected to apply
along with few where it doesn't skip.
+ */
+public class IntraPartitionSkippingTest extends SAITester
+{
+ @Test
+ public void testNameFilterExactMatch() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY
(pk, ck))");
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ for (int ck = 0; ck < 10; ck++)
+ {
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck,
"val" + ck);
+ }
+
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
pk = 1 AND ck = 5 AND val = 'val5' ALLOW FILTERING"),
+ row(1, 5,"val5")));
+ }
+
+ @Test
+ public void testSliceFilterRangeMatch() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY
(pk, ck))");
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ for (int ck = 0; ck < 100; ck++)
+ {
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck,
"val" + ck);
+ }
+
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
pk = 1 AND ck > 90 AND val = 'val99' ALLOW FILTERING"),
+ row(1, 99,"val99")));
+ }
+
+ @Test
+ public void testReversedClustering() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY
(pk, ck)) WITH CLUSTERING ORDER BY (ck DESC)");
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ for (int ck = 0; ck < 20; ck++)
+ {
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck,
"val" + ck);
+ }
+
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
pk = 1 AND ck < 10 AND val = 'val5' ALLOW FILTERING"),
+ row(1,5,"val5")));
+ }
+
+ @Test
+ public void testSkippingWithPaging() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck int, val int, PRIMARY KEY
(pk, ck))");
+
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ for (int ck = 0; ck < 100; ck++)
+ {
+ int val = 1000 + ck;
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck,
val);
+ }
+
+ beforeAndAfterFlush(() -> assertRowsNet(executeNetWithPaging("SELECT *
FROM %s WHERE pk = 1 AND ck > 90 AND val > 1090 ALLOW FILTERING", 5),
+ row(1, 91, 1091),
+ row(1, 92, 1092),
+ row(1, 93, 1093),
+ row(1, 94, 1094),
+ row(1, 95, 1095),
+ row(1, 96, 1096),
+ row(1, 97, 1097),
+ row(1, 98, 1098),
+ row(1, 99, 1099)));
+ }
+
+ @Test
+ public void testCompositeClusteringKeySkipping() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck1 int, ck2 int, val text,
PRIMARY KEY (pk, ck1, ck2))");
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ for (int ck1 = 0; ck1 < 10; ck1++)
+ for (int ck2 = 0; ck2 < 10; ck2++)
+ execute("INSERT INTO %s (pk, ck1, ck2, val) VALUES (?, ?, ?,
?)", 1, ck1, ck2, "v" + (ck1*10+ck2));
+
+
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
pk = 1 AND ck1 = 9 AND ck2 = 9 AND val = 'v99' ALLOW FILTERING"),
+ row(1,9,9,"v99")));
+
+ }
+
+ @Test
+ public void testSparseMatch() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY
(pk, ck))");
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ for (int ck = 0; ck < 1000; ck++)
+ {
+ String value = (ck % 450 == 0) ? "insert" : "skip";
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck,
value);
+ }
+
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
pk = 1 AND ck > 899 AND val = 'insert' ALLOW FILTERING"),
+ row(1,900,"insert")));
+
+ }
+
+ @Test
+ public void testMultipleNameFilters() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY
(pk, ck))");
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ for (int i = 0; i < 20; i++)
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, i,
"v5");
+
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
pk = 1 AND ck IN (5, 10, 15) AND val = 'v5' ALLOW FILTERING"),
+ row(1,5,"v5"), row(1,10,"v5"),
row(1,15,"v5")));
+
+ }
+
+ // Multiple partition range scans won't skip
+ @Test
+ public void testPartitionRangeSkipping() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck int, val text, PRIMARY KEY
(pk, ck))");
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ for (int pk = 0; pk < 10; pk++)
+ for (int ck = 0; ck < 5; ck++)
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", pk,
ck, "value" + pk);
+
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
val = 'value9' AND ck > 2 ALLOW FILTERING"),
+ row(9,3,"value9"),
row(9,4,"value9")));
+
+ }
+
+ @Test
+ public void testStaticColumns() throws Throwable
+ {
+ createTable("CREATE TABLE %S (pk int, ck int, s text static, val text,
PRIMARY KEY (pk, ck))");
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ execute("INSERT INTO %s (pk, s) VALUES (?, ?)", 1, "static1");
+
+ for (int ck = 0; ck < 200; ck++)
+ {
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", 1, ck,
"val" + ck);
+ }
+
+
+ // We will not skip
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
pk = 1 AND ck > 100 AND s = 'static1' AND val = 'val101' ALLOW FILTERING"),
+ row(1,101,"static1","val101")));
+
+ // we will skip
+ beforeAndAfterFlush(() -> assertRows(execute("SELECT * FROM %s WHERE
pk = 1 AND ck > 100 AND val = 'val101' ALLOW FILTERING"),
+ row(1,101,"static1","val101")));
+ }
+
+ @Test
+ public void testNextKeyClusteringIndexNamesFilter() throws Throwable
+ {
+ createTable("CREATE TABLE %S (" +
+ "pk int," +
+ "ck int," +
+ "v int," +
+ "PRIMARY KEY (pk, ck))");
+
+ createIndex("CREATE INDEX ON %s(v) USING 'sai'");
+
+ int pk = 1;
+ for (int ck = 0; ck < 10; ck++)
+ {
+ int v = ck + 1000;
+ execute("INSERT INTO %s (pk, ck, v) VALUES (?, ?, ?)", pk, ck, v);
+ }
+
+ int pk1 = 2;
+ for (int ck = 0; ck < 100; ck++)
+ {
+ execute("INSERT INTO %s (pk, ck, v) VALUES (?, ?, ?)", pk1, ck,
ck);
+ }
+
+ beforeAndAfterFlush(() -> {
+ assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck = 5 AND v
> 1004 ALLOW FILTERING"),
+ row(1, 5, 1005));
+
+ assertRows(execute("SELECT * FROM %s WHERE pk = 1 AND ck = 5 AND v
> 1004 AND v < 20000 ALLOW FILTERING"),
+ row(1, 5, 1005));
+ });
+
+
+ }
+
+ // Performance testing test-cases and can be ingnored.
+ @Ignore ("performance test case for Index Slice filter.")
+ @Test
+ public void testNextKeyPerfClusteringIndexSliceFilter()
+ {
+ createTable("CREATE TABLE %S (" +
+ "pk int, " +
+ "ck int, " +
+ "val text, " +
+ "PRIMARY KEY (pk, ck))");
+
+ createIndex("CREATE INDEX ON %s(val) USING 'sai'");
+
+ int pk = 1;
+ for (int ck = 0; ck < 10000; ck++)
+ {
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", pk, ck,
"hello1");
+ }
+
+ int pk1 = 2;
+ for (int ck = 0; ck < 100; ck++)
+ {
+ execute("INSERT INTO %s (pk, ck, val) VALUES (?, ?, ?)", pk1, ck,
"hello2");
+ }
+
+ Histogram histogram = new Histogram(4);
+
+
+ for (int i = 0; i < 10000; i++)
+ {
+ long start = System.nanoTime();
+ execute("SELECT * FROM %s WHERE pk = 1 AND ck > 9000 AND val =
'hello1' ALLOW FILTERING");
+ histogram.recordValue(System.nanoTime() - start);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("50th: " +
histogram.getValueAtPercentile(0.5));
+ System.out.println("95th: " +
histogram.getValueAtPercentile(0.95));
+ System.out.println("99th: " +
histogram.getValueAtPercentile(0.99));
+ }
+ }
+
+ }
+
+
+ @Ignore ("performance test case for Index Names filter.")
+ @Test
+ public void testNextKeyPerfClusteringIndexNamesFilter()
+ {
+ createTable("CREATE TABLE %S (" +
+ "pk int," +
+ "ck int," +
+ "v int," +
+ "PRIMARY KEY (pk, ck))");
+
+ createIndex("CREATE INDEX ON %s(v) USING 'sai'");
+
+ int pk = 1;
+ for (int ck = 0; ck < 20000; ck++)
+ {
+ int v = ck + 10;
+ execute("INSERT INTO %s (pk, ck, v) VALUES (?, ?, ?)", pk, ck, v);
+ }
+
+ int pk1 = 2;
+ for (int ck = 0; ck < 100; ck++)
+ {
+ execute("INSERT INTO %s (pk, ck, v) VALUES (?, ?, ?)", pk1, ck,
ck);
+ }
+
+ Histogram histogram = new Histogram(4);
+
+ for (int i = 0; i < 10000; i++)
+ {
+ long start = System.nanoTime();
+ execute("SELECT * FROM %s WHERE pk = 1 AND ck = 15000 AND v > 9000
ALLOW FILTERING");
+ histogram.recordValue(System.nanoTime() - start);
+
+ if (i % 1000 == 0)
+ {
+ System.out.println("50th: " +
histogram.getValueAtPercentile(0.5));
+ System.out.println("95th: " +
histogram.getValueAtPercentile(0.95));
+ System.out.println("99th: " +
histogram.getValueAtPercentile(0.99));
+ }
+ }
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]