This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 11ce51cd3a [core] The TopN predicate for range-bitmap supports
multiple sort keys (#6209)
11ce51cd3a is described below
commit 11ce51cd3a71e90e654cb26c83d471db4642efa3
Author: Glenn <[email protected]>
AuthorDate: Mon Sep 8 13:17:10 2025 +0800
[core] The TopN predicate for range-bitmap supports multiple sort keys
(#6209)
---
docs/content/concepts/spec/fileindex.md | 4 ++
.../bitmap/RangeBitmapIndexPushDownBenchmark.java | 46 ++++++++++++++++
.../paimon/fileindex/FileIndexPredicate.java | 7 +--
.../fileindex/rangebitmap/BitSliceIndexBitmap.java | 19 +++++++
.../rangebitmap/RangeBitmapFileIndex.java | 10 +---
.../java/org/apache/paimon/predicate/TopN.java | 7 ++-
.../apache/paimon/spark/PaimonScanBuilder.scala | 64 +++++++++++-----------
.../paimon/spark/sql/PaimonPushDownTestBase.scala | 43 ++++++++++++++-
8 files changed, 151 insertions(+), 49 deletions(-)
diff --git a/docs/content/concepts/spec/fileindex.md
b/docs/content/concepts/spec/fileindex.md
index 5533bb4d64..a02d257021 100644
--- a/docs/content/concepts/spec/fileindex.md
+++ b/docs/content/concepts/spec/fileindex.md
@@ -267,6 +267,10 @@ For now, the `TOPN` predicate optimization can not using
with other predicates,
SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score ASC LIMIT 10;
SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score DESC LIMIT 10;
+
+-- if there are multiple sort keys, the first sort key must be created with
range-bitmap.
+SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score ASC, col DESC LIMIT
10;
+SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score DESC, col ASC LIMIT
10;
```
<pre>
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
index c286b8727d..04b3306c1f 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
@@ -32,6 +32,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.SortValue;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
@@ -50,6 +51,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@@ -59,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
/** Benchmark for table read. */
@@ -92,6 +95,7 @@ public class RangeBitmapIndexPushDownBenchmark {
// benchmark TopN
benchmarkTopN(tables, bound, 1);
+ benchmarkMultipleTopN(tables, bound, 1);
}
}
@@ -233,6 +237,48 @@ public class RangeBitmapIndexPushDownBenchmark {
benchmark.run();
}
+ private void benchmarkMultipleTopN(Map<String, Table> tables, int bound,
int k) {
+ Benchmark benchmark =
+ new Benchmark("multiple-topn", ROW_COUNT)
+ .setNumWarmupIters(1)
+ .setOutputPerIteration(false);
+ for (String name : tables.keySet()) {
+ benchmark.addCase(
+ name + "-" + bound + "-" + k,
+ 1,
+ () -> {
+ Table table = tables.get(name);
+ List<SortValue> orders =
+ Arrays.asList(
+ new SortValue(
+ new FieldRef(0, "k",
DataTypes.INT()),
+ DESCENDING,
+ NULLS_LAST),
+ new SortValue(
+ new FieldRef(1, "f1",
DataTypes.STRING()),
+ ASCENDING,
+ NULLS_LAST));
+ TopN topN = new TopN(orders, k);
+ List<Split> splits =
table.newReadBuilder().newScan().plan().splits();
+ AtomicLong readCount = new AtomicLong(0);
+ try {
+ for (Split split : splits) {
+ RecordReader<InternalRow> reader =
+ table.newReadBuilder()
+ .withTopN(topN)
+ .newRead()
+ .createReader(split);
+ reader.forEachRemaining(row ->
readCount.incrementAndGet());
+ reader.close();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ benchmark.run();
+ }
+
private Table prepareData(int bound, Options options, String tableName)
throws Exception {
Table table = createTable(options, tableName);
StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index 90868224d7..8b93da7444 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -92,12 +92,6 @@ public class FileIndexPredicate implements Closeable {
return result;
}
- // for now we only support single column.
- List<SortValue> orders = topN.orders();
- if (orders.size() != 1) {
- return result;
- }
-
int k = topN.limit();
if (result instanceof BitmapIndexResult) {
long cardinality = ((BitmapIndexResult)
result).get().getCardinality();
@@ -106,6 +100,7 @@ public class FileIndexPredicate implements Closeable {
}
}
+ List<SortValue> orders = topN.orders();
String requiredName = orders.get(0).field().name();
Set<FileIndexReader> readers = reader.readColumnIndex(requiredName);
for (FileIndexReader reader : readers) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
index 6af3c6b34c..deb65ce00f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
@@ -152,6 +152,16 @@ public class BitSliceIndexBitmap {
return gt(code - 1);
}
+ /**
+ * Find k rows with largest values in a BSI.
+ *
+ * <p>Refer to algorithm 4.1 in the paper <a
+ * href="https://www.cs.umb.edu/~poneil/SIGBSTMH.pdf">Bit-Sliced Index
Arithmetic</a>
+ *
+ * @param k K largest values.
+ * @param foundSet the selection.
+ * @param strict if true, the result will be trimmed; otherwise, it will
not be.
+ */
public RoaringBitmap32 topK(int k, @Nullable RoaringBitmap32 foundSet,
boolean strict) {
if (k == 0 || (foundSet != null && foundSet.isEmpty())) {
return new RoaringBitmap32();
@@ -187,6 +197,7 @@ public class BitSliceIndexBitmap {
return f;
}
+ // return k rows
long n = f.getCardinality() - k;
if (n > 0) {
Iterator<Integer> iterator = e.iterator();
@@ -198,6 +209,13 @@ public class BitSliceIndexBitmap {
return f;
}
+ /**
+ * Find k rows with smallest values in a BSI.
+ *
+ * @param k K smallest values.
+ * @param foundSet the selection.
+ * @param strict if true, the result will be trimmed; otherwise, it will
not be.
+ */
public RoaringBitmap32 bottomK(int k, @Nullable RoaringBitmap32 foundSet,
boolean strict) {
if (k == 0 || (foundSet != null && foundSet.isEmpty())) {
return new RoaringBitmap32();
@@ -234,6 +252,7 @@ public class BitSliceIndexBitmap {
return f;
}
+ // return k rows
long n = f.getCardinality() - k;
if (n > 0) {
Iterator<Integer> iterator = e.iterator();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
index fe2859824a..1482e24f13 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
@@ -155,18 +155,14 @@ public class RangeBitmapFileIndex implements FileIndexer {
@Override
public FileIndexResult visitTopN(TopN topN, FileIndexResult result) {
- List<SortValue> orders = topN.orders();
-
- // If multiple columns, use first column with strict=false (allow
duplicates)
- boolean strict = orders.size() == 1;
- SortValue sort = orders.get(0);
-
RoaringBitmap32 foundSet =
result instanceof BitmapIndexResult ? ((BitmapIndexResult)
result).get() : null;
int limit = topN.limit();
+ List<SortValue> orders = topN.orders();
+ SortValue sort = orders.get(0);
SortValue.NullOrdering nullOrdering = sort.nullOrdering();
-
+ boolean strict = orders.size() == 1;
if (ASCENDING.equals(sort.direction())) {
return new BitmapIndexResult(
() -> bitmap.bottomK(limit, nullOrdering, foundSet,
strict));
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
index e66de2c560..70b50c0c3a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
@@ -20,13 +20,15 @@ package org.apache.paimon.predicate;
import org.apache.paimon.predicate.SortValue.NullOrdering;
import org.apache.paimon.predicate.SortValue.SortDirection;
-import org.apache.paimon.utils.Preconditions;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.ListUtils.isNullOrEmpty;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Represents the TopN predicate. */
public class TopN implements Serializable {
@@ -36,7 +38,8 @@ public class TopN implements Serializable {
private final int limit;
public TopN(List<SortValue> orders, int limit) {
- this.orders = Preconditions.checkNotNull(orders);
+ checkArgument(!isNullOrEmpty(orders), "orders should not be null or
empty");
+ this.orders = orders;
this.limit = limit;
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index f021feab95..8d802ca61d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -104,37 +104,39 @@ class PaimonScanBuilder(table: InnerTable)
return false
}
- if (orders.length != 1) {
- return false
- }
-
- val fieldName = orders.head.expression() match {
- case nr: NamedReference => nr.fieldNames.mkString(".")
- case _ => return false
- }
-
- val rowType = table.rowType()
- if (rowType.notContainsField(fieldName)) {
- return false
- }
-
- val field = rowType.getField(fieldName)
- val ref = new FieldRef(field.id(), field.name(), field.`type`())
-
- val nullOrdering = orders.head.nullOrdering() match {
- case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
- case expressions.NullOrdering.NULLS_FIRST => NullOrdering.NULLS_FIRST
- case _ => return false
- }
-
- val direction = orders.head.direction() match {
- case expressions.SortDirection.DESCENDING => SortDirection.DESCENDING
- case expressions.SortDirection.ASCENDING => SortDirection.ASCENDING
- case _ => return false
- }
-
- val sort = new SortValue(ref, direction, nullOrdering)
- pushDownTopN = Some(new TopN(Collections.singletonList(sort), limit))
+ val sorts: List[SortValue] = orders
+ .map(
+ order => {
+ val fieldName = order.expression() match {
+ case nr: NamedReference => nr.fieldNames.mkString(".")
+ case _ => return false
+ }
+
+ val rowType = table.rowType()
+ if (rowType.notContainsField(fieldName)) {
+ return false
+ }
+
+ val field = rowType.getField(fieldName)
+ val ref = new FieldRef(field.id(), field.name(), field.`type`())
+
+ val nullOrdering = order.nullOrdering() match {
+ case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
+ case expressions.NullOrdering.NULLS_FIRST =>
NullOrdering.NULLS_FIRST
+ case _ => return false
+ }
+
+ val direction = order.direction() match {
+ case expressions.SortDirection.DESCENDING =>
SortDirection.DESCENDING
+ case expressions.SortDirection.ASCENDING => SortDirection.ASCENDING
+ case _ => return false
+ }
+
+ new SortValue(ref, direction, nullOrdering)
+ })
+ .toList
+
+ pushDownTopN = Some(new TopN(sorts.asJava, limit))
// just make the best effort to push down TopN
false
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 27c8c00846..8759f99f00 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -282,7 +282,7 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
}
}
- test("Paimon pushDown: topN for append-only tables") {
+ test("Paimon pushDown: TopN for append-only tables") {
assume(gteqSpark3_3)
spark.sql("""
|CREATE TABLE T (pt INT, id INT, price BIGINT) PARTITIONED BY
(pt)
@@ -355,7 +355,44 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
Assertions.assertTrue(qe1.optimizedPlan.containsPattern(LIMIT))
}
- test("Paimon pushDown: topN for primary-key tables with deletion vector") {
+ test("Paimon pushDown: multi TopN for append-only tables") {
+ assume(gteqSpark3_3)
+ spark.sql("""
+ |CREATE TABLE T (pt INT, id INT, price BIGINT) PARTITIONED BY
(pt)
+ |TBLPROPERTIES ('file-index.range-bitmap.columns'='id')
+ |""".stripMargin)
+ Assertions.assertTrue(getScanBuilder().isInstanceOf[SupportsPushDownTopN])
+
+ spark.sql("""
+ |INSERT INTO T VALUES
+ |(1, 10, 100L),
+ |(1, 20, 100L),
+ |(1, 20, 200L),
+ |(1, 20, 200L),
+ |(1, 20, 200L),
+ |(1, 20, 200L),
+ |(1, 20, 300L),
+ |(1, 30, 100L)
+ |""".stripMargin)
+
+ // test ASC
+ checkAnswer(
+ spark.sql("SELECT id, price FROM T ORDER BY id ASC, price ASC LIMIT 2"),
+ Row(10, 100L) :: Row(20, 100L) :: Nil)
+ checkAnswer(
+ spark.sql("SELECT id, price FROM T ORDER BY id ASC, price DESC LIMIT 2"),
+ Row(10, 100L) :: Row(20, 300L) :: Nil)
+
+ // test DESC
+ checkAnswer(
+ spark.sql("SELECT id, price FROM T ORDER BY id DESC, price ASC LIMIT 2"),
+ Row(30, 100L) :: Row(20, 100L) :: Nil)
+ checkAnswer(
+ spark.sql("SELECT id, price FROM T ORDER BY id DESC, price DESC LIMIT
2"),
+ Row(30, 100L) :: Row(20, 300L) :: Nil)
+ }
+
+ test("Paimon pushDown: TopN for primary-key tables with deletion vector") {
assume(gteqSpark3_3)
withTable("dv_test") {
spark.sql("""
@@ -398,7 +435,7 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
}
}
- test("Paimon pushDown: topN for append-only tables with deletion vector") {
+ test("Paimon pushDown: TopN for append-only tables with deletion vector") {
assume(gteqSpark3_3)
withTable("dv_test") {
spark.sql("""