This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 11ce51cd3a [core] The TopN predicate for range-bitmap supports 
multiple sort keys (#6209)
11ce51cd3a is described below

commit 11ce51cd3a71e90e654cb26c83d471db4642efa3
Author: Glenn <[email protected]>
AuthorDate: Mon Sep 8 13:17:10 2025 +0800

    [core] The TopN predicate for range-bitmap supports multiple sort keys 
(#6209)
---
 docs/content/concepts/spec/fileindex.md            |  4 ++
 .../bitmap/RangeBitmapIndexPushDownBenchmark.java  | 46 ++++++++++++++++
 .../paimon/fileindex/FileIndexPredicate.java       |  7 +--
 .../fileindex/rangebitmap/BitSliceIndexBitmap.java | 19 +++++++
 .../rangebitmap/RangeBitmapFileIndex.java          | 10 +---
 .../java/org/apache/paimon/predicate/TopN.java     |  7 ++-
 .../apache/paimon/spark/PaimonScanBuilder.scala    | 64 +++++++++++-----------
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  | 43 ++++++++++++++-
 8 files changed, 151 insertions(+), 49 deletions(-)

diff --git a/docs/content/concepts/spec/fileindex.md 
b/docs/content/concepts/spec/fileindex.md
index 5533bb4d64..a02d257021 100644
--- a/docs/content/concepts/spec/fileindex.md
+++ b/docs/content/concepts/spec/fileindex.md
@@ -267,6 +267,10 @@ For now, the `TOPN` predicate optimization can not using 
with other predicates,
 SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score ASC LIMIT 10;
 
 SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score DESC LIMIT 10;
+
+-- if there are multiple sort keys, the first sort key must be created with 
range-bitmap.
+SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score ASC, col DESC LIMIT 
10;
+SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score DESC, col ASC LIMIT 
10;
 ```
 
 <pre>
diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
index c286b8727d..04b3306c1f 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
@@ -32,6 +32,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
@@ -50,6 +51,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -59,6 +61,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
 import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
 
 /** Benchmark for table read. */
@@ -92,6 +95,7 @@ public class RangeBitmapIndexPushDownBenchmark {
 
             // benchmark TopN
             benchmarkTopN(tables, bound, 1);
+            benchmarkMultipleTopN(tables, bound, 1);
         }
     }
 
@@ -233,6 +237,48 @@ public class RangeBitmapIndexPushDownBenchmark {
         benchmark.run();
     }
 
+    private void benchmarkMultipleTopN(Map<String, Table> tables, int bound, 
int k) {
+        Benchmark benchmark =
+                new Benchmark("multiple-topn", ROW_COUNT)
+                        .setNumWarmupIters(1)
+                        .setOutputPerIteration(false);
+        for (String name : tables.keySet()) {
+            benchmark.addCase(
+                    name + "-" + bound + "-" + k,
+                    1,
+                    () -> {
+                        Table table = tables.get(name);
+                        List<SortValue> orders =
+                                Arrays.asList(
+                                        new SortValue(
+                                                new FieldRef(0, "k", 
DataTypes.INT()),
+                                                DESCENDING,
+                                                NULLS_LAST),
+                                        new SortValue(
+                                                new FieldRef(1, "f1", 
DataTypes.STRING()),
+                                                ASCENDING,
+                                                NULLS_LAST));
+                        TopN topN = new TopN(orders, k);
+                        List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
+                        AtomicLong readCount = new AtomicLong(0);
+                        try {
+                            for (Split split : splits) {
+                                RecordReader<InternalRow> reader =
+                                        table.newReadBuilder()
+                                                .withTopN(topN)
+                                                .newRead()
+                                                .createReader(split);
+                                reader.forEachRemaining(row -> 
readCount.incrementAndGet());
+                                reader.close();
+                            }
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+        }
+        benchmark.run();
+    }
+
     private Table prepareData(int bound, Options options, String tableName) 
throws Exception {
         Table table = createTable(options, tableName);
         StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index 90868224d7..8b93da7444 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -92,12 +92,6 @@ public class FileIndexPredicate implements Closeable {
             return result;
         }
 
-        // for now we only support single column.
-        List<SortValue> orders = topN.orders();
-        if (orders.size() != 1) {
-            return result;
-        }
-
         int k = topN.limit();
         if (result instanceof BitmapIndexResult) {
             long cardinality = ((BitmapIndexResult) 
result).get().getCardinality();
@@ -106,6 +100,7 @@ public class FileIndexPredicate implements Closeable {
             }
         }
 
+        List<SortValue> orders = topN.orders();
         String requiredName = orders.get(0).field().name();
         Set<FileIndexReader> readers = reader.readColumnIndex(requiredName);
         for (FileIndexReader reader : readers) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
index 6af3c6b34c..deb65ce00f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
@@ -152,6 +152,16 @@ public class BitSliceIndexBitmap {
         return gt(code - 1);
     }
 
+    /**
+     * Find k rows with largest values in a BSI.
+     *
+     * <p>Refer to algorithm 4.1 in the paper <a
+     * href="https://www.cs.umb.edu/~poneil/SIGBSTMH.pdf";>Bit-Sliced Index 
Arithmetic</a>
+     *
+     * @param k K largest values.
+     * @param foundSet the selection.
+     * @param strict if true, the result will be trimmed; otherwise, it will 
not be.
+     */
     public RoaringBitmap32 topK(int k, @Nullable RoaringBitmap32 foundSet, 
boolean strict) {
         if (k == 0 || (foundSet != null && foundSet.isEmpty())) {
             return new RoaringBitmap32();
@@ -187,6 +197,7 @@ public class BitSliceIndexBitmap {
             return f;
         }
 
+        // return k rows
         long n = f.getCardinality() - k;
         if (n > 0) {
             Iterator<Integer> iterator = e.iterator();
@@ -198,6 +209,13 @@ public class BitSliceIndexBitmap {
         return f;
     }
 
+    /**
+     * Find k rows with smallest values in a BSI.
+     *
+     * @param k K smallest values.
+     * @param foundSet the selection.
+     * @param strict if true, the result will be trimmed; otherwise, it will 
not be.
+     */
     public RoaringBitmap32 bottomK(int k, @Nullable RoaringBitmap32 foundSet, 
boolean strict) {
         if (k == 0 || (foundSet != null && foundSet.isEmpty())) {
             return new RoaringBitmap32();
@@ -234,6 +252,7 @@ public class BitSliceIndexBitmap {
             return f;
         }
 
+        // return k rows
         long n = f.getCardinality() - k;
         if (n > 0) {
             Iterator<Integer> iterator = e.iterator();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
index fe2859824a..1482e24f13 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
@@ -155,18 +155,14 @@ public class RangeBitmapFileIndex implements FileIndexer {
 
         @Override
         public FileIndexResult visitTopN(TopN topN, FileIndexResult result) {
-            List<SortValue> orders = topN.orders();
-
-            // If multiple columns, use first column with strict=false (allow 
duplicates)
-            boolean strict = orders.size() == 1;
-            SortValue sort = orders.get(0);
-
             RoaringBitmap32 foundSet =
                     result instanceof BitmapIndexResult ? ((BitmapIndexResult) 
result).get() : null;
 
             int limit = topN.limit();
+            List<SortValue> orders = topN.orders();
+            SortValue sort = orders.get(0);
             SortValue.NullOrdering nullOrdering = sort.nullOrdering();
-
+            boolean strict = orders.size() == 1;
             if (ASCENDING.equals(sort.direction())) {
                 return new BitmapIndexResult(
                         () -> bitmap.bottomK(limit, nullOrdering, foundSet, 
strict));
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
index e66de2c560..70b50c0c3a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
@@ -20,13 +20,15 @@ package org.apache.paimon.predicate;
 
 import org.apache.paimon.predicate.SortValue.NullOrdering;
 import org.apache.paimon.predicate.SortValue.SortDirection;
-import org.apache.paimon.utils.Preconditions;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.ListUtils.isNullOrEmpty;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Represents the TopN predicate. */
 public class TopN implements Serializable {
 
@@ -36,7 +38,8 @@ public class TopN implements Serializable {
     private final int limit;
 
     public TopN(List<SortValue> orders, int limit) {
-        this.orders = Preconditions.checkNotNull(orders);
+        checkArgument(!isNullOrEmpty(orders), "orders should not be null or 
empty");
+        this.orders = orders;
         this.limit = limit;
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index f021feab95..8d802ca61d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -104,37 +104,39 @@ class PaimonScanBuilder(table: InnerTable)
       return false
     }
 
-    if (orders.length != 1) {
-      return false
-    }
-
-    val fieldName = orders.head.expression() match {
-      case nr: NamedReference => nr.fieldNames.mkString(".")
-      case _ => return false
-    }
-
-    val rowType = table.rowType()
-    if (rowType.notContainsField(fieldName)) {
-      return false
-    }
-
-    val field = rowType.getField(fieldName)
-    val ref = new FieldRef(field.id(), field.name(), field.`type`())
-
-    val nullOrdering = orders.head.nullOrdering() match {
-      case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
-      case expressions.NullOrdering.NULLS_FIRST => NullOrdering.NULLS_FIRST
-      case _ => return false
-    }
-
-    val direction = orders.head.direction() match {
-      case expressions.SortDirection.DESCENDING => SortDirection.DESCENDING
-      case expressions.SortDirection.ASCENDING => SortDirection.ASCENDING
-      case _ => return false
-    }
-
-    val sort = new SortValue(ref, direction, nullOrdering)
-    pushDownTopN = Some(new TopN(Collections.singletonList(sort), limit))
+    val sorts: List[SortValue] = orders
+      .map(
+        order => {
+          val fieldName = order.expression() match {
+            case nr: NamedReference => nr.fieldNames.mkString(".")
+            case _ => return false
+          }
+
+          val rowType = table.rowType()
+          if (rowType.notContainsField(fieldName)) {
+            return false
+          }
+
+          val field = rowType.getField(fieldName)
+          val ref = new FieldRef(field.id(), field.name(), field.`type`())
+
+          val nullOrdering = order.nullOrdering() match {
+            case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
+            case expressions.NullOrdering.NULLS_FIRST => 
NullOrdering.NULLS_FIRST
+            case _ => return false
+          }
+
+          val direction = order.direction() match {
+            case expressions.SortDirection.DESCENDING => 
SortDirection.DESCENDING
+            case expressions.SortDirection.ASCENDING => SortDirection.ASCENDING
+            case _ => return false
+          }
+
+          new SortValue(ref, direction, nullOrdering)
+        })
+      .toList
+
+    pushDownTopN = Some(new TopN(sorts.asJava, limit))
 
     // just make the best effort to push down TopN
     false
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 27c8c00846..8759f99f00 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -282,7 +282,7 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     }
   }
 
-  test("Paimon pushDown: topN for append-only tables") {
+  test("Paimon pushDown: TopN for append-only tables") {
     assume(gteqSpark3_3)
     spark.sql("""
                 |CREATE TABLE T (pt INT, id INT, price BIGINT) PARTITIONED BY 
(pt)
@@ -355,7 +355,44 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     Assertions.assertTrue(qe1.optimizedPlan.containsPattern(LIMIT))
   }
 
-  test("Paimon pushDown: topN for primary-key tables with deletion vector") {
+  test("Paimon pushDown: multi TopN for append-only tables") {
+    assume(gteqSpark3_3)
+    spark.sql("""
+                |CREATE TABLE T (pt INT, id INT, price BIGINT) PARTITIONED BY 
(pt)
+                |TBLPROPERTIES ('file-index.range-bitmap.columns'='id')
+                |""".stripMargin)
+    Assertions.assertTrue(getScanBuilder().isInstanceOf[SupportsPushDownTopN])
+
+    spark.sql("""
+                |INSERT INTO T VALUES
+                |(1, 10, 100L),
+                |(1, 20, 100L),
+                |(1, 20, 200L),
+                |(1, 20, 200L),
+                |(1, 20, 200L),
+                |(1, 20, 200L),
+                |(1, 20, 300L),
+                |(1, 30, 100L)
+                |""".stripMargin)
+
+    // test ASC
+    checkAnswer(
+      spark.sql("SELECT id, price FROM T ORDER BY id ASC, price ASC LIMIT 2"),
+      Row(10, 100L) :: Row(20, 100L) :: Nil)
+    checkAnswer(
+      spark.sql("SELECT id, price FROM T ORDER BY id ASC, price DESC LIMIT 2"),
+      Row(10, 100L) :: Row(20, 300L) :: Nil)
+
+    // test DESC
+    checkAnswer(
+      spark.sql("SELECT id, price FROM T ORDER BY id DESC, price ASC LIMIT 2"),
+      Row(30, 100L) :: Row(20, 100L) :: Nil)
+    checkAnswer(
+      spark.sql("SELECT id, price FROM T ORDER BY id DESC, price DESC LIMIT 
2"),
+      Row(30, 100L) :: Row(20, 300L) :: Nil)
+  }
+
+  test("Paimon pushDown: TopN for primary-key tables with deletion vector") {
     assume(gteqSpark3_3)
     withTable("dv_test") {
       spark.sql("""
@@ -398,7 +435,7 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     }
   }
 
-  test("Paimon pushDown: topN for append-only tables with deletion vector") {
+  test("Paimon pushDown: TopN for append-only tables with deletion vector") {
     assume(gteqSpark3_3)
     withTable("dv_test") {
       spark.sql("""

Reply via email to