This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ce2ab8cbac [core] Support TopN pushdown for append-only table (#6028)
ce2ab8cbac is described below

commit ce2ab8cbac241ed9421f05662f2bc1068c8a7e0d
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Mon Aug 18 12:53:56 2025 +0800

    [core] Support TopN pushdown for append-only table (#6028)
---
 docs/content/append-table/query-performance.md     |  39 +++-
 docs/content/concepts/spec/fileindex.md            |   3 +-
 .../bitmap/RangeBitmapIndexPushDownBenchmark.java  | 255 +++++++++++++++++++++
 .../paimon/fileindex/FileIndexPredicate.java       |  35 +++
 .../apache/paimon/fileindex/FileIndexReader.java   |   5 +
 .../fileindex/rangebitmap/BitSliceIndexBitmap.java |  94 +++++++-
 .../paimon/fileindex/rangebitmap/RangeBitmap.java  |  68 +++++-
 .../rangebitmap/RangeBitmapFileIndex.java          |  25 ++
 .../org/apache/paimon/predicate/SortValue.java     |  91 ++++++++
 .../{utils/ListUtils.java => predicate/TopN.java}  |  37 ++-
 .../java/org/apache/paimon/utils/ListUtils.java    |   4 +
 .../org/apache/paimon/utils/RoaringBitmap32.java   |   8 +
 .../rangebitmap/BitSliceIndexBitmapTest.java       |  90 ++++++++
 .../rangebitmap/RangeBitmapFileIndexTest.java      | 178 +++++++++++++-
 .../org/apache/paimon/io/FileIndexEvaluator.java   |  57 +++--
 .../paimon/io/KeyValueFileReaderFactory.java       |   2 +-
 .../paimon/operation/DataEvolutionSplitRead.java   |   1 +
 .../apache/paimon/operation/RawFileSplitRead.java  |  12 +-
 .../org/apache/paimon/operation/SplitRead.java     |   5 +
 .../paimon/table/source/AppendTableRead.java       |  10 +
 .../apache/paimon/table/source/InnerTableRead.java |   5 +
 .../apache/paimon/table/source/ReadBuilder.java    |   7 +
 .../paimon/table/source/ReadBuilderImpl.java       |  11 +
 .../apache/paimon/utils/FormatReaderMapping.java   |  19 +-
 .../paimon/table/AppendOnlySimpleTableTest.java    | 104 +++++++++
 .../paimon/utils/FormatReaderMappingTest.java      |   3 +-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   3 +-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   3 +-
 .../paimon/spark/ColumnPruningAndPushDown.scala    |   4 +-
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |   7 +-
 .../paimon/spark/PaimonBaseScanBuilder.scala       |  12 +-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |   3 +-
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  72 +++++-
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  |  66 +++++-
 34 files changed, 1272 insertions(+), 66 deletions(-)

diff --git a/docs/content/append-table/query-performance.md 
b/docs/content/append-table/query-performance.md
index 1c3d48468d..7c15455b3d 100644
--- a/docs/content/append-table/query-performance.md
+++ b/docs/content/append-table/query-performance.md
@@ -83,8 +83,43 @@ scenario. Using a bitmap may consume more space but can 
result in greater accura
 `Bitmap`:
 * `file-index.bitmap.columns`: specify the columns that need bitmap index. See 
[Index Bitmap]({{< ref "concepts/spec/fileindex#index-bitmap" >}}).
 
-`Bit-Slice Index Bitmap`
-* `file-index.bsi.columns`: specify the columns that need bsi index.
+`Range Bitmap Index Bitmap`
+* `file-index.range-bitmap.columns`: specify the columns that need 
range-bitmap index. See [Index Range Bitmap]({{< ref 
"concepts/spec/fileindex#index-range-bitmap" >}}).
+
+
+Append Table supports using range-bitmap file index to optimize the `EQUALS`, 
`RANGE`, `AND/OR` and `TOPN` predicate. The bitmap and range-bitmap file index 
result will be merged and pushed down to the DataFile for filtering rowgroups 
and pages.
+
+In the following query examples, the `class_id` and the `score` has been 
created with range-bitmap file index. And the partition key `dt` is not 
necessary.
+
+**Optimize the `EQUALS` predicate:**
+```sql
+SELECT * FROM TABLE WHERE dt = '20250801' AND score = 100;
+
+SELECT * FROM TABLE WHERE dt = '20250801' AND score IN (60, 80);
+```
+
+**Optimize the `RANGE` predicate:**
+```sql
+SELECT * FROM TABLE WHERE dt = '20250801' AND score > 60;
+
+SELECT * FROM TABLE WHERE dt = '20250801' AND score < 60;
+```
+
+**Optimize the `AND/OR` predicate:**
+```sql
+SELECT * FROM TABLE WHERE dt = '20250801' AND class_id = 1 AND score < 60;
+
+SELECT * FROM TABLE WHERE dt = '20250801' AND class_id = 1 AND score < 60 OR 
score > 80;
+```
+
+**Optimize the `TOPN` predicate:**
+
+For now, the `TOPN` predicate optimization can not using with other 
predicates, only support in Apache Spark.
+```sql
+SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score ASC LIMIT 10;
+
+SELECT * FROM TABLE WHERE dt = '20250801' ORDER BY score DESC LIMIT 10;
+```
 
 More filter types will be supported...
 
diff --git a/docs/content/concepts/spec/fileindex.md 
b/docs/content/concepts/spec/fileindex.md
index 7a46fb8d1a..b041139a8e 100644
--- a/docs/content/concepts/spec/fileindex.md
+++ b/docs/content/concepts/spec/fileindex.md
@@ -222,7 +222,8 @@ LocalZonedTimestampType, TimestampType, CharType, 
VarCharType, StringType, Boole
 Advantage:
 1. Smaller than the bitmap index.
 2. Suitable for the point query and the range query in the high level of 
cardinality scenarios.
-3. Can be used conjunction with bitmap index.
+3. Can be used to optimize the AND/OR predicates. (The corresponding columns 
need to have either bitmap index or range-bitmap index.)
+4. Can be used to optimize the topk/bottomk query. (Currently only suitable 
for append-only tables.)
 
 Shortcoming:
 1. The point query evaluation maybe slower than bitmap index.
diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
new file mode 100644
index 0000000000..fa0600911a
--- /dev/null
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.benchmark.bitmap;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.benchmark.Benchmark;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
+
+/** Benchmark for table read. */
+public class RangeBitmapIndexPushDownBenchmark {
+
+    private static final int VALUE_COUNT = 15;
+    private static final int ROW_COUNT = 1000000;
+    public static final int[] BOUNDS = new int[] {300, 3000, 30000, 300000, 
3000000};
+
+    @TempDir java.nio.file.Path tempFile;
+
+    private final RandomDataGenerator random = new RandomDataGenerator();
+
+    @Test
+    public void testParquet() throws Exception {
+        for (int bound : BOUNDS) {
+            Table table = prepareData(bound, parquet(), "parquet_" + bound);
+            Map<String, Table> tables = new LinkedHashMap<>();
+            tables.put(
+                    "without-index",
+                    
table.copy(Collections.singletonMap("file-index.read.enabled", "false")));
+            tables.put(
+                    "with-index",
+                    
table.copy(Collections.singletonMap("file-index.read.enabled", "true")));
+
+            // benchmark equals
+            benchmarkEquals(tables, bound, random.nextInt(0, bound));
+
+            // benchmark between
+            benchmarkBetween(tables, bound, 0, 100);
+
+            // benchmark TopN
+            benchmarkTopN(tables, bound, 1);
+        }
+    }
+
+    private Options parquet() {
+        Options options = new Options();
+        options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_PARQUET);
+        options.set("file-index.range-bitmap.columns", "k");
+        return options;
+    }
+
+    private void benchmarkEquals(Map<String, Table> tables, int bound, int 
value) {
+        Benchmark benchmark =
+                new Benchmark("equals", ROW_COUNT)
+                        .setNumWarmupIters(1)
+                        .setOutputPerIteration(false);
+        for (String name : tables.keySet()) {
+            benchmark.addCase(
+                    name + "-" + bound,
+                    5,
+                    () -> {
+                        Table table = tables.get(name);
+                        Predicate predicate = new 
PredicateBuilder(table.rowType()).equal(0, value);
+                        List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
+                        AtomicLong readCount = new AtomicLong(0);
+                        try {
+                            for (Split split : splits) {
+                                RecordReader<InternalRow> reader =
+                                        table.newReadBuilder()
+                                                .withFilter(predicate)
+                                                .newRead()
+                                                .createReader(split);
+                                reader.forEachRemaining(row -> 
readCount.incrementAndGet());
+                                reader.close();
+                            }
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+        }
+        benchmark.run();
+    }
+
+    private void benchmarkBetween(Map<String, Table> tables, int bound, int 
from, int to) {
+        Benchmark benchmark =
+                new Benchmark("between", ROW_COUNT)
+                        .setNumWarmupIters(1)
+                        .setOutputPerIteration(false);
+        for (String name : tables.keySet()) {
+            benchmark.addCase(
+                    name + "-" + bound + "-" + from + "-" + to,
+                    5,
+                    () -> {
+                        Table table = tables.get(name);
+                        Predicate predicate =
+                                new 
PredicateBuilder(table.rowType()).between(0, from, to);
+                        List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
+                        AtomicLong readCount = new AtomicLong(0);
+                        try {
+                            for (Split split : splits) {
+                                RecordReader<InternalRow> reader =
+                                        table.newReadBuilder()
+                                                .withFilter(predicate)
+                                                .newRead()
+                                                .createReader(split);
+                                reader.forEachRemaining(row -> 
readCount.incrementAndGet());
+                                reader.close();
+                            }
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+        }
+        benchmark.run();
+    }
+
+    private void benchmarkTopN(Map<String, Table> tables, int bound, int k) {
+        Benchmark benchmark =
+                new Benchmark("topn", 
ROW_COUNT).setNumWarmupIters(1).setOutputPerIteration(false);
+        for (String name : tables.keySet()) {
+            benchmark.addCase(
+                    name + "-" + bound + "-" + k,
+                    1,
+                    () -> {
+                        Table table = tables.get(name);
+                        FieldRef ref = new FieldRef(0, "k", DataTypes.INT());
+                        SortValue sort = new SortValue(ref, DESCENDING, 
NULLS_LAST);
+                        TopN topN = new TopN(Collections.singletonList(sort), 
k);
+                        List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
+                        AtomicLong readCount = new AtomicLong(0);
+                        try {
+                            for (Split split : splits) {
+                                RecordReader<InternalRow> reader =
+                                        table.newReadBuilder()
+                                                .withTopN(topN)
+                                                .newRead()
+                                                .createReader(split);
+                                reader.forEachRemaining(row -> 
readCount.incrementAndGet());
+                                reader.close();
+                            }
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+        }
+        benchmark.run();
+    }
+
+    private Table prepareData(int bound, Options options, String tableName) 
throws Exception {
+        Table table = createTable(options, tableName);
+        StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
+        StreamTableWrite write = writeBuilder.newWrite();
+        StreamTableCommit commit = writeBuilder.newCommit();
+        AtomicInteger writeCount = new AtomicInteger(0);
+        for (int i = 0; i < ROW_COUNT; i++) {
+            try {
+                write.write(newRandomRow(bound));
+                writeCount.incrementAndGet();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        List<CommitMessage> commitMessages = write.prepareCommit(true, 1);
+        commit.commit(1, commitMessages);
+
+        write.close();
+        return table;
+    }
+
+    protected Table createTable(Options tableOptions, String tableName) throws 
Exception {
+        Options catalogOptions = new Options();
+        catalogOptions.set(CatalogOptions.WAREHOUSE, 
tempFile.toUri().toString());
+        Catalog catalog = 
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
+        String database = "default";
+        catalog.createDatabase(database, true);
+
+        List<DataField> fields = new ArrayList<>();
+        fields.add(new DataField(0, "k", new IntType()));
+        for (int i = 1; i <= VALUE_COUNT; i++) {
+            fields.add(new DataField(i, "f" + i, DataTypes.STRING()));
+        }
+        Schema schema =
+                new Schema(
+                        fields,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        tableOptions.toMap(),
+                        "");
+        Identifier identifier = Identifier.create(database, tableName);
+        catalog.createTable(identifier, schema, false);
+        return catalog.getTable(identifier);
+    }
+
+    protected InternalRow newRandomRow(int bound) {
+        GenericRow row = new GenericRow(1 + VALUE_COUNT);
+        row.setField(0, random.nextInt(0, bound));
+        for (int i = 1; i <= VALUE_COUNT; i++) {
+            row.setField(i, BinaryString.fromString(random.nextHexString(32)));
+        }
+        return row;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
index 8f5485dbe6..90868224d7 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.fileindex;
 
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
 import org.apache.paimon.fs.ByteArraySeekableStream;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -28,6 +29,8 @@ import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateVisitor;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.types.RowType;
 
 import org.slf4j.Logger;
@@ -40,6 +43,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -83,6 +87,37 @@ public class FileIndexPredicate implements Closeable {
         return result;
     }
 
+    public FileIndexResult evaluateTopN(@Nullable TopN topN, FileIndexResult 
result) {
+        if (topN == null || !result.remain()) {
+            return result;
+        }
+
+        // for now we only support single column.
+        List<SortValue> orders = topN.orders();
+        if (orders.size() != 1) {
+            return result;
+        }
+
+        int k = topN.limit();
+        if (result instanceof BitmapIndexResult) {
+            long cardinality = ((BitmapIndexResult) 
result).get().getCardinality();
+            if (cardinality <= k) {
+                return result;
+            }
+        }
+
+        String requiredName = orders.get(0).field().name();
+        Set<FileIndexReader> readers = reader.readColumnIndex(requiredName);
+        for (FileIndexReader reader : readers) {
+            FileIndexResult ret = reader.visitTopN(topN, result);
+            if (!REMAIN.equals(ret)) {
+                ret.remain();
+                return ret;
+            }
+        }
+        return result;
+    }
+
     private Set<String> getRequiredNames(Predicate filePredicate) {
         return filePredicate.visit(
                 new PredicateVisitor<Set<String>>() {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
index d715e6465f..fbb8efb777 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java
@@ -20,6 +20,7 @@ package org.apache.paimon.fileindex;
 
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.FunctionVisitor;
+import org.apache.paimon.predicate.TopN;
 
 import java.util.List;
 
@@ -119,4 +120,8 @@ public abstract class FileIndexReader implements 
FunctionVisitor<FileIndexResult
     public FileIndexResult visitOr(List<FileIndexResult> children) {
         throw new UnsupportedOperationException("Should not invoke this");
     }
+
+    public FileIndexResult visitTopN(TopN topN, FileIndexResult result) {
+        return REMAIN;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
index 0708b43774..978d4635c9 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmap.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.paimon.utils.IOUtils.readFully;
@@ -151,7 +152,98 @@ public class BitSliceIndexBitmap {
         return gt(code - 1);
     }
 
+    public RoaringBitmap32 topK(int k, @Nullable RoaringBitmap32 foundSet) {
+        if (k == 0 || (foundSet != null && foundSet.isEmpty())) {
+            return new RoaringBitmap32();
+        }
+
+        if (k < 0) {
+            throw new IllegalArgumentException("the k param can not be 
negative in topK, k=" + k);
+        }
+
+        RoaringBitmap32 g = new RoaringBitmap32();
+        RoaringBitmap32 e = isNotNull(foundSet);
+        if (e.getCardinality() <= k) {
+            return e;
+        }
+
+        loadSlices(0, slices.length);
+        for (int i = slices.length - 1; i >= 0; i--) {
+            RoaringBitmap32 x = RoaringBitmap32.or(g, RoaringBitmap32.and(e, 
getSlice(i)));
+            long n = x.getCardinality();
+            if (n > k) {
+                e = RoaringBitmap32.and(e, getSlice(i));
+            } else if (n < k) {
+                g = x;
+                e = RoaringBitmap32.andNot(e, getSlice(i));
+            } else {
+                e = RoaringBitmap32.and(e, getSlice(i));
+                break;
+            }
+        }
+
+        // only k results should be returned
+        RoaringBitmap32 f = RoaringBitmap32.or(g, e);
+        long n = f.getCardinality() - k;
+        if (n > 0) {
+            Iterator<Integer> iterator = e.iterator();
+            while (iterator.hasNext() && n > 0) {
+                f.remove(iterator.next());
+                n--;
+            }
+        }
+        return f;
+    }
+
+    public RoaringBitmap32 bottomK(int k, @Nullable RoaringBitmap32 foundSet) {
+        if (k == 0 || (foundSet != null && foundSet.isEmpty())) {
+            return new RoaringBitmap32();
+        }
+
+        if (k < 0) {
+            throw new IllegalArgumentException(
+                    "the k param can not be negative in bottomK, k=" + k);
+        }
+
+        RoaringBitmap32 g = new RoaringBitmap32();
+        RoaringBitmap32 e = isNotNull(foundSet);
+        if (e.getCardinality() <= k) {
+            return e;
+        }
+
+        loadSlices(0, slices.length);
+        for (int i = slices.length - 1; i >= 0; i--) {
+            RoaringBitmap32 x = RoaringBitmap32.or(g, 
RoaringBitmap32.andNot(e, getSlice(i)));
+            long n = x.getCardinality();
+            if (n > k) {
+                e = RoaringBitmap32.andNot(e, getSlice(i));
+            } else if (n < k) {
+                g = x;
+                e = RoaringBitmap32.and(e, getSlice(i));
+            } else {
+                e = RoaringBitmap32.andNot(e, getSlice(i));
+                break;
+            }
+        }
+
+        // only k results should be returned
+        RoaringBitmap32 f = RoaringBitmap32.or(g, e);
+        long n = f.getCardinality() - k;
+        if (n > 0) {
+            Iterator<Integer> iterator = e.iterator();
+            while (iterator.hasNext() && n > 0) {
+                f.remove(iterator.next());
+                n--;
+            }
+        }
+        return f;
+    }
+
     public RoaringBitmap32 isNotNull() {
+        return isNotNull(null);
+    }
+
+    private RoaringBitmap32 isNotNull(@Nullable RoaringBitmap32 foundSet) {
         if (ebm == null) {
             try {
                 in.seek(bodyOffset);
@@ -164,7 +256,7 @@ public class BitSliceIndexBitmap {
                 throw new RuntimeException(e);
             }
         }
-        return ebm.clone();
+        return foundSet == null ? ebm.clone() : RoaringBitmap32.and(ebm, 
foundSet);
     }
 
     private void loadSlices(int begin, int end) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmap.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmap.java
index f8d0ceba68..3c1c2626b3 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmap.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmap.java
@@ -22,6 +22,7 @@ import 
org.apache.paimon.fileindex.rangebitmap.dictionary.Dictionary;
 import 
org.apache.paimon.fileindex.rangebitmap.dictionary.chunked.ChunkedDictionary;
 import org.apache.paimon.fileindex.rangebitmap.dictionary.chunked.KeyFactory;
 import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.RoaringBitmap32;
 
@@ -34,6 +35,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.function.BiFunction;
+
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
 
 /** Implementation of range-bitmap. */
 public class RangeBitmap {
@@ -208,13 +212,7 @@ public class RangeBitmap {
     }
 
     public RoaringBitmap32 isNull() {
-        if (cardinality <= 0) {
-            return rid > 0 ? RoaringBitmap32.bitmapOf(0, rid - 1) : new 
RoaringBitmap32();
-        }
-
-        RoaringBitmap32 bitmap = isNotNull();
-        bitmap.flip(0, rid);
-        return bitmap;
+        return isNull(null);
     }
 
     public RoaringBitmap32 isNotNull() {
@@ -225,6 +223,23 @@ public class RangeBitmap {
         return getBitSliceIndexBitmap().isNotNull();
     }
 
+    private RoaringBitmap32 isNull(@Nullable RoaringBitmap32 foundSet) {
+        if (cardinality <= 0) {
+            return rid > 0 ? RoaringBitmap32.bitmapOf(0, rid - 1) : new 
RoaringBitmap32();
+        }
+
+        if (foundSet != null && foundSet.isEmpty()) {
+            return foundSet;
+        }
+
+        RoaringBitmap32 bitmap = isNotNull();
+        bitmap.flip(0, rid);
+        if (foundSet != null) {
+            bitmap.and(foundSet);
+        }
+        return bitmap;
+    }
+
     public Object get(int position) {
         if (position < 0 || position >= rid) {
             return null;
@@ -264,6 +279,45 @@ public class RangeBitmap {
         return bsi;
     }
 
+    public RoaringBitmap32 topK(
+            int k, SortValue.NullOrdering nullOrdering, @Nullable 
RoaringBitmap32 foundSet) {
+        return fillNulls(k, nullOrdering, foundSet, (l, r) -> 
getBitSliceIndexBitmap().topK(l, r));
+    }
+
+    public RoaringBitmap32 bottomK(
+            int k, SortValue.NullOrdering nullOrdering, @Nullable 
RoaringBitmap32 foundSet) {
+        return fillNulls(
+                k, nullOrdering, foundSet, (l, r) -> 
getBitSliceIndexBitmap().bottomK(l, r));
+    }
+
+    private RoaringBitmap32 fillNulls(
+            int k,
+            SortValue.NullOrdering nullOrdering,
+            @Nullable RoaringBitmap32 foundSet,
+            BiFunction<Integer, RoaringBitmap32, RoaringBitmap32> function) {
+        if (cardinality <= 0) {
+            return rid > 0 ? RoaringBitmap32.bitmapOf(0, rid - 1) : new 
RoaringBitmap32();
+        }
+
+        RoaringBitmap32 bitmap;
+        if (NULLS_LAST.equals(nullOrdering)) {
+            bitmap = function.apply(k, foundSet);
+            long cardinality = bitmap.getCardinality();
+            if (cardinality >= k) {
+                return bitmap;
+            }
+            bitmap.or(isNull(foundSet).limit((int) (k - cardinality)));
+        } else {
+            bitmap = isNull(foundSet);
+            long cardinality = bitmap.getCardinality();
+            if (cardinality >= k) {
+                return bitmap.limit(k);
+            }
+            bitmap.or(function.apply((int) (k - cardinality), foundSet));
+        }
+        return bitmap;
+    }
+
     /** A Builder for {@link RangeBitmap}. */
     public static class Appender {
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
index 09e3bfe013..65deca4553 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndex.java
@@ -28,12 +28,17 @@ import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import java.util.List;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
+
 /** Implementation of range-bitmap file index. */
 public class RangeBitmapFileIndex implements FileIndexer {
 
@@ -147,5 +152,25 @@ public class RangeBitmapFileIndex implements FileIndexer {
         public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object 
literal) {
             return new BitmapIndexResult(() -> 
bitmap.gte(converter.apply(literal)));
         }
+
+        @Override
+        public FileIndexResult visitTopN(TopN topN, FileIndexResult result) {
+            List<SortValue> orders = topN.orders();
+            if (orders.size() != 1) {
+                return FileIndexResult.REMAIN;
+            }
+
+            RoaringBitmap32 foundSet =
+                    result instanceof BitmapIndexResult ? ((BitmapIndexResult) 
result).get() : null;
+
+            int limit = topN.limit();
+            SortValue sort = topN.orders().get(0);
+            SortValue.NullOrdering nullOrdering = sort.nullOrdering();
+            if (ASCENDING.equals(sort.direction())) {
+                return new BitmapIndexResult(() -> bitmap.bottomK(limit, 
nullOrdering, foundSet));
+            } else {
+                return new BitmapIndexResult(() -> bitmap.topK(limit, 
nullOrdering, foundSet));
+            }
+        }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
new file mode 100644
index 0000000000..61d7913510
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.Serializable;
+
+/** Represents a sort order. */
+public class SortValue implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FieldRef field;
+    private final SortDirection direction;
+    private final NullOrdering nullOrdering;
+
+    public SortValue(FieldRef field, SortDirection direction, NullOrdering 
nullOrdering) {
+        this.field = Preconditions.checkNotNull(field);
+        this.direction = Preconditions.checkNotNull(direction);
+        this.nullOrdering = Preconditions.checkNotNull(nullOrdering);
+    }
+
+    public FieldRef field() {
+        return field;
+    }
+
+    public SortDirection direction() {
+        return direction;
+    }
+
+    public NullOrdering nullOrdering() {
+        return nullOrdering;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "%s %s %s", field.name(), direction.toString(), 
nullOrdering.toString());
+    }
+
+    /** A null order used in sorting expressions. */
+    public enum NullOrdering {
+        NULLS_FIRST("NULLS FIRST"),
+        NULLS_LAST("NULLS LAST");
+
+        private final String name;
+
+        NullOrdering(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    /** A sort direction used in sorting expressions. */
+    public enum SortDirection {
+        ASCENDING("ASC"),
+        DESCENDING("DESC");
+
+        private final String name;
+
+        SortDirection(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
similarity index 50%
copy from paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
copy to paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
index 6919ac15f4..2ecb38cce3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
@@ -16,19 +16,38 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.utils;
+package org.apache.paimon.predicate;
 
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.Serializable;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/** Represents the TopN predicate. */
+public class TopN implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<SortValue> orders;
+    private final int limit;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+    public TopN(List<SortValue> orders, int limit) {
+        this.orders = Preconditions.checkNotNull(orders);
+        this.limit = limit;
+    }
+
+    public List<SortValue> orders() {
+        return orders;
+    }
 
-/** Utils for {@link List}. */
-public class ListUtils {
+    public int limit() {
+        return limit;
+    }
 
-    public static <T> T pickRandomly(List<T> list) {
-        checkArgument(!list.isEmpty(), "list is empty");
-        int index = ThreadLocalRandom.current().nextInt(list.size());
-        return list.get(index);
+    @Override
+    public String toString() {
+        String sort = 
orders.stream().map(SortValue::toString).collect(Collectors.joining(", "));
+        return String.format("Sort(%s), Limit(%s)", sort, limit);
     }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
index 6919ac15f4..e870b68e1d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
@@ -31,4 +31,8 @@ public class ListUtils {
         int index = ThreadLocalRandom.current().nextInt(list.size());
         return list.get(index);
     }
+
+    public static <T> boolean isNullOrEmpty(List<T> list) {
+        return list == null || list.isEmpty();
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 6b7078efaf..8c3b2802ac 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -105,6 +105,14 @@ public class RoaringBitmap32 {
         return roaringBitmap.intersects(minimum, supremum);
     }
 
+    public RoaringBitmap32 limit(int k) {
+        return new RoaringBitmap32(roaringBitmap.limit(k));
+    }
+
+    public void remove(int position) {
+        roaringBitmap.remove(position);
+    }
+
     public RoaringBitmap32 clone() {
         return new RoaringBitmap32(roaringBitmap.clone());
     }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmapTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmapTest.java
index 1a52a05db7..997f11eeea 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmapTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/BitSliceIndexBitmapTest.java
@@ -108,6 +108,96 @@ public class BitSliceIndexBitmapTest {
         for (Pair<Integer, Integer> pair : pairs) {
             assertThat(bsi.get(pair.getKey())).isEqualTo(pair.getValue());
         }
+
+        // test topK
+        for (int i = 0; i < 10; i++) {
+            int k = random.nextInt(CARDINALITY);
+
+            // without found set
+            RoaringBitmap32 expected = new RoaringBitmap32();
+            pairs.stream()
+                    .filter(pair -> pair.getValue() != null)
+                    .sorted(
+                            (x, y) -> {
+                                int result = 
y.getValue().compareTo(x.getValue());
+                                if (result == 0) {
+                                    return y.getKey().compareTo(x.getKey());
+                                }
+                                return result;
+                            })
+                    .map(Pair::getKey)
+                    .limit(k)
+                    .forEach(expected::add);
+            RoaringBitmap32 actual = bsi.topK(k, null);
+            assertThat(actual).isEqualTo(expected);
+
+            // with found set
+            expected = new RoaringBitmap32();
+            RoaringBitmap32 foundSet = new RoaringBitmap32();
+            for (int j = 0; j < random.nextInt(CARDINALITY); j++) {
+                foundSet.add(random.nextInt(ROW_COUNT));
+            }
+            pairs.stream()
+                    .filter(pair -> pair.getValue() != null)
+                    .filter(pair -> foundSet.contains(pair.getKey()))
+                    .sorted(
+                            (x, y) -> {
+                                int result = 
y.getValue().compareTo(x.getValue());
+                                if (result == 0) {
+                                    return y.getKey().compareTo(x.getKey());
+                                }
+                                return result;
+                            })
+                    .map(Pair::getKey)
+                    .limit(k)
+                    .forEach(expected::add);
+            actual = bsi.topK(k, foundSet);
+            assertThat(actual).isEqualTo(expected);
+        }
+
+        // test bottomK
+        for (int i = 0; i < 10; i++) {
+            int k = random.nextInt(CARDINALITY);
+            RoaringBitmap32 expected = new RoaringBitmap32();
+            pairs.stream()
+                    .filter(pair -> pair.getValue() != null)
+                    .sorted(
+                            (x, y) -> {
+                                int result = 
x.getValue().compareTo(y.getValue());
+                                if (result == 0) {
+                                    return y.getKey().compareTo(x.getKey());
+                                }
+                                return result;
+                            })
+                    .map(Pair::getKey)
+                    .limit(k)
+                    .forEach(expected::add);
+            RoaringBitmap32 actual = bsi.bottomK(k, null);
+            assertThat(actual).isEqualTo(expected);
+
+            // with found set
+            expected = new RoaringBitmap32();
+            RoaringBitmap32 foundSet = new RoaringBitmap32();
+            for (int j = 0; j < random.nextInt(CARDINALITY); j++) {
+                foundSet.add(random.nextInt(ROW_COUNT));
+            }
+            pairs.stream()
+                    .filter(pair -> pair.getValue() != null)
+                    .filter(pair -> foundSet.contains(pair.getKey()))
+                    .sorted(
+                            (x, y) -> {
+                                int result = 
x.getValue().compareTo(y.getValue());
+                                if (result == 0) {
+                                    return y.getKey().compareTo(x.getKey());
+                                }
+                                return result;
+                            })
+                    .map(Pair::getKey)
+                    .limit(k)
+                    .forEach(expected::add);
+            actual = bsi.bottomK(k, foundSet);
+            assertThat(actual).isEqualTo(expected);
+        }
     }
 
     @Test
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
index 372516de94..904f24826f 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
@@ -25,6 +25,8 @@ import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
 import org.apache.paimon.fs.ByteArraySeekableStream;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.Pair;
@@ -34,12 +36,18 @@ import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** test for {@link RangeBitmapFileIndex}. */
@@ -230,6 +238,117 @@ public class RangeBitmapFileIndexTest {
             assertThat(((BitmapIndexResult) 
reader.visitIsNotNull(fieldRef)).get())
                     .isEqualTo(bitmap);
         }
+
+        Comparator<Pair<Integer, BinaryString>> nullLastCompactor =
+                (x, y) -> {
+                    if (x.getValue() == null && y.getValue() == null) {
+                        return x.getKey().compareTo(y.getKey());
+                    }
+                    if (x.getValue() == null) {
+                        return 1;
+                    }
+                    if (y.getValue() == null) {
+                        return -1;
+                    }
+                    int result = x.getValue().compareTo(y.getValue());
+                    if (result == 0) {
+                        return -x.getKey().compareTo(y.getKey());
+                    }
+                    return result;
+                };
+        Comparator<Pair<Integer, BinaryString>> nullFirstCompactor =
+                (x, y) -> {
+                    if (x.getValue() == null && y.getValue() == null) {
+                        return x.getKey().compareTo(y.getKey());
+                    }
+                    if (x.getValue() == null) {
+                        return -1;
+                    }
+                    if (y.getValue() == null) {
+                        return 1;
+                    }
+                    int result = x.getValue().compareTo(y.getValue());
+                    if (result == 0) {
+                        return -x.getKey().compareTo(y.getKey());
+                    }
+                    return result;
+                };
+
+        for (int i = 0; i < 10; i++) {
+            int k = random.nextInt(ROW_COUNT);
+            RoaringBitmap32 foundSet = new RoaringBitmap32();
+            for (int j = 0; j < random.nextInt(BOUND); j++) {
+                foundSet.add(random.nextInt(ROW_COUNT));
+            }
+            RoaringBitmap32 expected = new RoaringBitmap32();
+
+            // test NULL_LAST without found set
+            TopN topN =
+                    new TopN(
+                            Collections.singletonList(
+                                    new SortValue(fieldRef, ASCENDING, 
NULLS_LAST)),
+                            k);
+            pairs.stream()
+                    .sorted(nullLastCompactor)
+                    .limit(k)
+                    .map(Pair::getKey)
+                    .forEach(expected::add);
+            RoaringBitmap32 actual = ((BitmapIndexResult) 
reader.visitTopN(topN, null)).get();
+            assertThat(actual).isEqualTo(expected);
+
+            // test NULL_LAST with found set
+            expected.clear();
+            topN =
+                    new TopN(
+                            Collections.singletonList(
+                                    new SortValue(fieldRef, ASCENDING, 
NULLS_LAST)),
+                            k);
+            pairs.stream()
+                    .filter(pair -> foundSet.contains(pair.getKey()))
+                    .sorted(nullLastCompactor)
+                    .limit(k)
+                    .map(Pair::getKey)
+                    .forEach(expected::add);
+            actual =
+                    ((BitmapIndexResult)
+                                    reader.visitTopN(topN, new 
BitmapIndexResult(() -> foundSet)))
+                            .get();
+            assertThat(actual).isEqualTo(expected);
+
+            // test NULL_FIRST without found set
+            expected.clear();
+            topN =
+                    new TopN(
+                            Collections.singletonList(
+                                    new SortValue(fieldRef, ASCENDING, 
NULLS_FIRST)),
+                            k);
+            pairs.stream()
+                    .sorted(nullFirstCompactor)
+                    .limit(k)
+                    .map(Pair::getKey)
+                    .forEach(expected::add);
+            actual = ((BitmapIndexResult) reader.visitTopN(topN, null)).get();
+            assertThat(actual).isEqualTo(expected);
+
+            // test NULL_FIRST with found set
+            expected.clear();
+            topN =
+                    new TopN(
+                            Collections.singletonList(
+                                    new SortValue(fieldRef, ASCENDING, 
NULLS_FIRST)),
+                            k);
+            pairs.stream()
+                    .filter(pair -> foundSet.contains(pair.getKey()))
+                    .sorted(nullFirstCompactor)
+                    .limit(k)
+                    .map(Pair::getKey)
+                    .forEach(expected::add);
+            actual =
+                    ((BitmapIndexResult)
+                                    reader.visitTopN(topN, new 
BitmapIndexResult(() -> foundSet)))
+                            .get();
+            assertThat(actual).isEqualTo(expected);
+        }
     }
 
     @Test
@@ -243,6 +362,9 @@ public class RangeBitmapFileIndexTest {
         writer.writeRecord(5);
         writer.writeRecord(7);
         writer.writeRecord(9);
+        writer.writeRecord(null);
+        writer.writeRecord(null);
+        writer.writeRecord(10);
 
         // build index
         byte[] bytes = writer.serializedBytes();
@@ -263,23 +385,63 @@ public class RangeBitmapFileIndexTest {
 
         // test gt
         assertThat(((BitmapIndexResult) reader.visitGreaterThan(fieldRef, 
0)).get())
-                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2, 3, 4));
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2, 3, 4, 7));
         assertThat(((BitmapIndexResult) reader.visitGreaterThan(fieldRef, 
1)).get())
-                .isEqualTo(RoaringBitmap32.bitmapOf(1, 2, 3, 4));
+                .isEqualTo(RoaringBitmap32.bitmapOf(1, 2, 3, 4, 7));
         assertThat(((BitmapIndexResult) reader.visitGreaterThan(fieldRef, 
6)).get())
-                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4));
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 7));
         assertThat(((BitmapIndexResult) reader.visitGreaterThan(fieldRef, 
9)).get())
-                .isEqualTo(RoaringBitmap32.bitmapOf());
+                .isEqualTo(RoaringBitmap32.bitmapOf(7));
 
         // test gte
         assertThat(((BitmapIndexResult) reader.visitGreaterOrEqual(fieldRef, 
0)).get())
-                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2, 3, 4));
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2, 3, 4, 7));
         assertThat(((BitmapIndexResult) reader.visitGreaterOrEqual(fieldRef, 
1)).get())
-                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2, 3, 4));
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2, 3, 4, 7));
         assertThat(((BitmapIndexResult) reader.visitGreaterOrEqual(fieldRef, 
6)).get())
-                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4));
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 7));
         assertThat(((BitmapIndexResult) reader.visitGreaterOrEqual(fieldRef, 
9)).get())
-                .isEqualTo(RoaringBitmap32.bitmapOf(4));
+                .isEqualTo(RoaringBitmap32.bitmapOf(4, 7));
+
+        // test bottomK
+        BitmapIndexResult result =
+                new BitmapIndexResult(() -> RoaringBitmap32.bitmapOf(0, 3, 4, 
5));
+        TopN bottomNullFirst =
+                new TopN(
+                        Collections.singletonList(new SortValue(fieldRef, 
ASCENDING, NULLS_FIRST)),
+                        3);
+        assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullFirst, 
null)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 5, 6));
+        assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullFirst, 
result)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 5));
+
+        TopN bottomNullLast =
+                new TopN(
+                        Collections.singletonList(new SortValue(fieldRef, 
ASCENDING, NULLS_LAST)),
+                        3);
+        assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullLast, 
null)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2));
+        assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullLast, 
result)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4));
+
+        // test topK
+        TopN topNullFirst =
+                new TopN(
+                        Collections.singletonList(new SortValue(fieldRef, 
DESCENDING, NULLS_FIRST)),
+                        3);
+        assertThat(((BitmapIndexResult) reader.visitTopN(topNullFirst, 
null)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(5, 6, 7));
+        assertThat(((BitmapIndexResult) reader.visitTopN(topNullFirst, 
result)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5));
+
+        TopN topNullLast =
+                new TopN(
+                        Collections.singletonList(new SortValue(fieldRef, 
DESCENDING, NULLS_LAST)),
+                        3);
+        assertThat(((BitmapIndexResult) reader.visitTopN(topNullLast, 
null)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 7));
+        assertThat(((BitmapIndexResult) reader.visitTopN(topNullLast, 
result)).get())
+                .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4));
     }
 
     @Test
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index 3ed4c278d9..b58907de1c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -23,7 +23,9 @@ import org.apache.paimon.fileindex.FileIndexResult;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.ListUtils;
 
 import java.io.IOException;
 import java.util.List;
@@ -36,40 +38,57 @@ public class FileIndexEvaluator {
             FileIO fileIO,
             TableSchema dataSchema,
             List<Predicate> dataFilter,
+            TopN topN,
             DataFilePathFactory dataFilePathFactory,
             DataFileMeta file)
             throws IOException {
-        if (dataFilter != null && !dataFilter.isEmpty()) {
+        FileIndexResult result = FileIndexResult.REMAIN;
+        if (ListUtils.isNullOrEmpty(dataFilter) && topN == null) {
+            return result;
+        }
+
+        FileIndexPredicate predicate = null;
+        try {
             byte[] embeddedIndex = file.embeddedIndex();
             if (embeddedIndex != null) {
-                try (FileIndexPredicate predicate =
-                        new FileIndexPredicate(embeddedIndex, 
dataSchema.logicalRowType())) {
-                    return predicate.evaluate(
-                            PredicateBuilder.and(dataFilter.toArray(new 
Predicate[0])));
+                predicate = new FileIndexPredicate(embeddedIndex, 
dataSchema.logicalRowType());
+            } else {
+                List<String> indexFiles =
+                        file.extraFiles().stream()
+                                .filter(
+                                        name ->
+                                                name.endsWith(
+                                                        
DataFilePathFactory.INDEX_PATH_SUFFIX))
+                                .collect(Collectors.toList());
+                if (indexFiles.isEmpty()) {
+                    return result;
                 }
-            }
-
-            List<String> indexFiles =
-                    file.extraFiles().stream()
-                            .filter(name -> 
name.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
-                            .collect(Collectors.toList());
-            if (!indexFiles.isEmpty()) {
                 if (indexFiles.size() > 1) {
                     throw new RuntimeException(
                             "Found more than one index file for one data file: 
"
                                     + String.join(" and ", indexFiles));
                 }
-                // go to file index check
-                try (FileIndexPredicate predicate =
+                predicate =
                         new FileIndexPredicate(
                                 
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
                                 fileIO,
-                                dataSchema.logicalRowType())) {
-                    return predicate.evaluate(
-                            PredicateBuilder.and(dataFilter.toArray(new 
Predicate[0])));
-                }
+                                dataSchema.logicalRowType());
+            }
+
+            // evaluate
+            if (!ListUtils.isNullOrEmpty(dataFilter)) {
+                result =
+                        predicate.evaluate(
+                                PredicateBuilder.and(dataFilter.toArray(new 
Predicate[0])));
+            } else if (topN != null) {
+                result = predicate.evaluateTopN(topN, result);
+            }
+
+            return result;
+        } finally {
+            if (predicate != null) {
+                predicate.close();
             }
         }
-        return FileIndexResult.REMAIN;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 48796eb6cd..376c45b5dc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -272,7 +272,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                     finalReadKeyType,
                     readValueType,
                     new FormatReaderMapping.Builder(
-                            formatDiscover, readTableFields, fieldsExtractor, 
filters),
+                            formatDiscover, readTableFields, fieldsExtractor, 
filters, null),
                     pathFactory.createDataFilePathFactory(partition, bucket),
                     options.fileReaderAsyncThreshold().getBytes(),
                     partition,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index bf6bf1ea22..acd0b39871 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -125,6 +125,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                         formatDiscover,
                         readRowType.getFields(),
                         schema -> 
rowTypeWithRowLineage(schema.logicalRowType(), true).getFields(),
+                        null,
                         null);
 
         List<List<DataFileMeta>> splitByRowId = 
DataEvolutionSplitGenerator.split(files);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 6aac9de937..44374a503a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -38,6 +38,7 @@ import org.apache.paimon.io.FileIndexEvaluator;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.EmptyFileRecordReader;
 import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.ReaderSupplier;
@@ -82,6 +83,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
 
     private RowType readRowType;
     @Nullable private List<Predicate> filters;
+    @Nullable private TopN topN;
 
     public RawFileSplitRead(
             FileIO fileIO,
@@ -127,6 +129,12 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         return this;
     }
 
+    @Override
+    public SplitRead<InternalRow> withTopN(@Nullable TopN topN) {
+        this.topN = topN;
+        return this;
+    }
+
     @Override
     public RecordReader<InternalRow> createReader(DataSplit split) throws 
IOException {
         if (!split.beforeFiles().isEmpty()) {
@@ -164,7 +172,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                             }
                             return schema.fields();
                         },
-                        filters);
+                        filters,
+                        topN);
 
         for (DataFileMeta file : files) {
             suppliers.add(
@@ -220,6 +229,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                             fileIO,
                             formatReaderMapping.getDataSchema(),
                             formatReaderMapping.getDataFilters(),
+                            formatReaderMapping.getTopN(),
                             dataFilePathFactory,
                             file);
             if (!fileIndexResult.remain()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
index 1722aa53ed..e4a8ab5f3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/SplitRead.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
@@ -44,6 +45,10 @@ public interface SplitRead<T> {
 
     SplitRead<T> withFilter(@Nullable Predicate predicate);
 
+    default SplitRead<T> withTopN(@Nullable TopN topN) {
+        return this;
+    }
+
     /** Create a {@link RecordReader} from split. */
     RecordReader<T> createReader(DataSplit split) throws IOException;
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index 83e26301ea..cdb66483f5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.splitread.SplitReadConfig;
@@ -45,6 +46,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
 
     @Nullable private RowType readType = null;
     private Predicate predicate = null;
+    private TopN topN = null;
 
     public AppendTableRead(
             List<Function<SplitReadConfig, SplitReadProvider>> 
providerFactories,
@@ -71,6 +73,7 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
             read = read.withReadType(readType);
         }
         read.withFilter(predicate);
+        read.withTopN(topN);
     }
 
     @Override
@@ -86,6 +89,13 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
         return this;
     }
 
+    @Override
+    public InnerTableRead withTopN(TopN topN) {
+        initialized().forEach(r -> r.withTopN(topN));
+        this.topN = topN;
+        return this;
+    }
+
     @Override
     public RecordReader<InternalRow> reader(Split split) throws IOException {
         DataSplit dataSplit = (DataSplit) split;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
index 3a8ceeb353..1e34e911ae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableRead.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.types.RowType;
 
 import java.util.List;
@@ -50,6 +51,10 @@ public interface InnerTableRead extends TableRead {
         throw new UnsupportedOperationException();
     }
 
+    default InnerTableRead withTopN(TopN topN) {
+        return this;
+    }
+
     default InnerTableRead forceKeepDelete() {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index e544182aa7..83dcb2e33e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 
@@ -132,6 +133,12 @@ public interface ReadBuilder extends Serializable {
     /** the row number pushed down. */
     ReadBuilder withLimit(int limit);
 
+    /**
+     * Push TopN filter. Will filter the data as much as possible, but it is 
not guaranteed that it
+     * is a complete filter.
+     */
+    ReadBuilder withTopN(TopN topN);
+
     /**
      * Specify the shard to be read, and allocate sharded files to read 
records. Note that this
      * method cannot be used simultaneously with {@link 
#withBucketFilter(Filter)}.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 3f8765f303..5d529aa41d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.table.InnerTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
@@ -47,6 +48,7 @@ public class ReadBuilderImpl implements ReadBuilder {
     private Predicate filter;
 
     private Integer limit = null;
+    private TopN topN = null;
 
     private Integer shardIndexOfThisSubtask;
     private Integer shardNumberOfParallelSubtasks;
@@ -128,6 +130,12 @@ public class ReadBuilderImpl implements ReadBuilder {
         return this;
     }
 
+    @Override
+    public ReadBuilder withTopN(TopN topN) {
+        this.topN = topN;
+        return this;
+    }
+
     @Override
     public ReadBuilder withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
         this.shardIndexOfThisSubtask = indexOfThisSubtask;
@@ -203,6 +211,9 @@ public class ReadBuilderImpl implements ReadBuilder {
         if (readType != null) {
             read.withReadType(readType);
         }
+        if (topN != null) {
+            read.withTopN(topN);
+        }
         return read;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 63bb700305..8422ed8e78 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -23,6 +23,7 @@ import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.schema.IndexCastMapping;
 import org.apache.paimon.schema.SchemaEvolutionUtil;
 import org.apache.paimon.schema.TableSchema;
@@ -60,6 +61,7 @@ public class FormatReaderMapping {
     private final TableSchema dataSchema;
     private final List<Predicate> dataFilters;
     private final Map<String, Integer> systemFields;
+    @Nullable private final TopN topN;
 
     public FormatReaderMapping(
             @Nullable int[] indexMapping,
@@ -69,7 +71,8 @@ public class FormatReaderMapping {
             FormatReaderFactory readerFactory,
             TableSchema dataSchema,
             List<Predicate> dataFilters,
-            Map<String, Integer> systemFields) {
+            Map<String, Integer> systemFields,
+            @Nullable TopN topN) {
         this.indexMapping = combine(indexMapping, trimmedKeyMapping);
         this.castMapping = castMapping;
         this.readerFactory = readerFactory;
@@ -77,6 +80,7 @@ public class FormatReaderMapping {
         this.dataSchema = dataSchema;
         this.dataFilters = dataFilters;
         this.systemFields = systemFields;
+        this.topN = topN;
     }
 
     private int[] combine(@Nullable int[] indexMapping, @Nullable int[] 
trimmedKeyMapping) {
@@ -130,6 +134,11 @@ public class FormatReaderMapping {
         return dataFilters;
     }
 
+    @Nullable
+    public TopN getTopN() {
+        return topN;
+    }
+
     /** Builder for {@link FormatReaderMapping}. */
     public static class Builder {
 
@@ -137,16 +146,19 @@ public class FormatReaderMapping {
         private final List<DataField> readFields;
         private final Function<TableSchema, List<DataField>> fieldsExtractor;
         @Nullable private final List<Predicate> filters;
+        @Nullable private final TopN topN;
 
         public Builder(
                 FileFormatDiscover formatDiscover,
                 List<DataField> readFields,
                 Function<TableSchema, List<DataField>> fieldsExtractor,
-                @Nullable List<Predicate> filters) {
+                @Nullable List<Predicate> filters,
+                @Nullable TopN topN) {
             this.formatDiscover = formatDiscover;
             this.readFields = readFields;
             this.fieldsExtractor = fieldsExtractor;
             this.filters = filters;
+            this.topN = topN;
         }
 
         /**
@@ -208,7 +220,8 @@ public class FormatReaderMapping {
                             .createReaderFactory(actualReadRowType, 
readFilters),
                     dataSchema,
                     readFilters,
-                    systemFields);
+                    systemFields,
+                    topN);
         }
 
         public FormatReaderMapping build(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index a6a0e19732..3188f3223c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.fileindex.bitmap.BitmapFileIndexFactory;
 import org.apache.paimon.fileindex.bloomfilter.BloomFilterFileIndexFactory;
 import org.apache.paimon.fileindex.bsi.BitSliceIndexBitmapFileIndexFactory;
+import org.apache.paimon.fileindex.rangebitmap.RangeBitmapFileIndexFactory;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -37,9 +38,12 @@ import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Equal;
+import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
@@ -56,8 +60,10 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RoaringBitmap32;
 
 import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetOutputFormat;
 
@@ -74,6 +80,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.PriorityQueue;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -816,6 +823,103 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
         }
     }
 
+    @Test
+    public void testRangeBitmapIndexTopNFilter() throws Exception {
+        RowType rowType =
+                RowType.builder()
+                        .field("id", DataTypes.STRING())
+                        .field("event", DataTypes.STRING())
+                        .field("price", DataTypes.INT())
+                        .build();
+        // in unaware-bucket mode, we split files into splits all the time
+        FileStoreTable table =
+                createUnawareBucketFileStoreTable(
+                        rowType,
+                        options -> {
+                            options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+                            options.set(WRITE_ONLY, true);
+                            options.set(
+                                    FileIndexOptions.FILE_INDEX
+                                            + "."
+                                            + 
RangeBitmapFileIndexFactory.RANGE_BITMAP
+                                            + "."
+                                            + CoreOptions.COLUMNS,
+                                    "price");
+                            options.set(ParquetOutputFormat.BLOCK_SIZE, 
"1048576");
+                            options.set(
+                                    
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+                            
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
+                        });
+
+        int bound = 300000;
+        int rowCount = 1000000;
+        Random random = new Random();
+        int k = random.nextInt(100) + 1;
+        PriorityQueue<Integer> expected = new PriorityQueue<>(k, 
Integer::compareTo);
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        for (int j = 0; j < rowCount; j++) {
+            int next = random.nextInt(bound);
+            BinaryString uuid = 
BinaryString.fromString(UUID.randomUUID().toString());
+            write.write(GenericRow.of(uuid, uuid, next));
+
+            // TopK expected
+            if (expected.size() < k) {
+                expected.offer(next);
+            } else if (expected.peek() <= next) {
+                expected.poll();
+                expected.offer(next);
+            }
+        }
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
+
+        // test TopK index
+        {
+            RoaringBitmap32 bitmap = new RoaringBitmap32();
+            expected.forEach(bitmap::add);
+            DataField field = rowType.getField("price");
+            SortValue sort =
+                    new SortValue(
+                            new FieldRef(field.id(), field.name(), 
field.type()),
+                            SortValue.SortDirection.DESCENDING,
+                            SortValue.NullOrdering.NULLS_LAST);
+            TopN topN = new TopN(Collections.singletonList(sort), k);
+            TableScan.Plan plan = table.newScan().plan();
+            RecordReader<InternalRow> reader =
+                    table.newRead().withTopN(topN).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            RoaringBitmap32 actual = new RoaringBitmap32();
+            reader.forEachRemaining(
+                    row -> {
+                        cnt.incrementAndGet();
+                        actual.add(row.getInt(2));
+                    });
+            assertThat(cnt.get()).isEqualTo(k);
+            assertThat(actual).isEqualTo(bitmap);
+            reader.close();
+        }
+
+        // test TopK without index
+        {
+            DataField field = rowType.getField("id");
+            SortValue sort =
+                    new SortValue(
+                            new FieldRef(field.id(), field.name(), 
field.type()),
+                            SortValue.SortDirection.DESCENDING,
+                            SortValue.NullOrdering.NULLS_LAST);
+            TopN topN = new TopN(Collections.singletonList(sort), k);
+            TableScan.Plan plan = table.newScan().plan();
+            RecordReader<InternalRow> reader =
+                    table.newRead().withTopN(topN).createReader(plan.splits());
+            AtomicInteger cnt = new AtomicInteger(0);
+            reader.forEachRemaining(row -> cnt.incrementAndGet());
+            assertThat(cnt.get()).isEqualTo(rowCount);
+            reader.close();
+        }
+    }
+
     @Test
     public void testWithShardAppendTable() throws Exception {
         FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, 
-1));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
index 0444d5644d..af7fba5401 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/FormatReaderMappingTest.java
@@ -135,7 +135,8 @@ public class FormatReaderMappingTest {
                         null,
                         null,
                         null,
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        null);
 
         Assertions.assertThat(formatReaderMapping.getIndexMapping())
                 .containsExactly(0, 1, 0, -1, 2);
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 67d91652b8..e8fe9a9c40 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.table.InnerTable
 
 import org.apache.spark.sql.PaimonUtils.fieldReference
@@ -36,6 +36,7 @@ case class PaimonScan(
     reservedFilters: Seq[Filter],
     override val pushDownLimit: Option[Int],
     // no usage, just for compile compatibility
+    override val pushDownTopN: Option[TopN],
     bucketedScanDisabled: Boolean = true)
   extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
   with SupportsRuntimeFiltering {
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 0c356d9b13..a451df1e19 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
@@ -37,6 +37,7 @@ case class PaimonScan(
     filters: Seq[Predicate],
     reservedFilters: Seq[Filter],
     override val pushDownLimit: Option[Int],
+    override val pushDownTopN: Option[TopN],
     bucketedScanDisabled: Boolean = false)
   extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
   with SupportsRuntimeFiltering
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index c64565d16a..ce1fe2e25f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder, TopN}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.table.{InnerTable, SpecialFields}
 import org.apache.paimon.table.source.ReadBuilder
@@ -35,6 +35,7 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
   def requiredSchema: StructType
   def filters: Seq[Predicate]
   def pushDownLimit: Option[Int] = None
+  def pushDownTopN: Option[TopN] = None
 
   lazy val tableRowType: RowType = {
     val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
@@ -71,6 +72,7 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
       _readBuilder.withFilter(pushedPredicate)
     }
     pushDownLimit.foreach(_readBuilder.withLimit)
+    pushDownTopN.foreach(_readBuilder.withTopN)
     _readBuilder.dropStats()
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 90be4027f8..bffa0a6bf8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -128,7 +128,12 @@ abstract class PaimonBaseScan(
     } else {
       ""
     }
-    s"PaimonScan: [${table.name}]" + pushedFiltersStr +
+    val pushedTopNFilterStr = if (pushDownTopN.nonEmpty) {
+      s", PushedTopNFilter: [${pushDownTopN.get.toString}]"
+    } else {
+      ""
+    }
+    s"PaimonScan: [${table.name}]" + pushedFiltersStr + pushedTopNFilterStr +
       pushDownLimit.map(limit => s", Limit: [$limit]").getOrElse("")
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index fe1e58a756..cb3b94647e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.table.InnerTable
 
 import org.apache.spark.internal.Logging
@@ -41,8 +41,16 @@ abstract class PaimonBaseScanBuilder(table: InnerTable)
 
   protected var pushDownLimit: Option[Int] = None
 
+  protected var pushDownTopN: Option[TopN] = None
+
   override def build(): Scan = {
-    PaimonScan(table, requiredSchema, pushedPaimonPredicates, reservedFilters, 
pushDownLimit)
+    PaimonScan(
+      table,
+      requiredSchema,
+      pushedPaimonPredicates,
+      reservedFilters,
+      pushDownLimit,
+      pushDownTopN)
   }
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index eef1787d99..0ceb36d240 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions.BucketFunctionType
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.spark.commands.BucketExpression.quote
 import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable, Table}
 import org.apache.paimon.table.source.{DataSplit, Split}
@@ -40,6 +40,7 @@ case class PaimonScan(
     filters: Seq[Predicate],
     reservedFilters: Seq[Filter],
     override val pushDownLimit: Option[Int],
+    override val pushDownTopN: Option[TopN],
     bucketedScanDisabled: Boolean = false)
   extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
   with SupportsRuntimeV2Filtering
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 10910da298..899c204718 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -18,17 +18,23 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, 
PredicateBuilder}
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.predicate._
+import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
 import org.apache.paimon.spark.aggregate.{AggregatePushDownUtils, 
LocalAggregator}
-import org.apache.paimon.table.{FileStoreTable, InnerTable, Table}
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, FileStoreTable, 
InnerTable}
 import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.sql.PaimonUtils
+import org.apache.spark.sql.connector.expressions
+import org.apache.spark.sql.connector.expressions.{NamedReference, SortOrder}
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
 import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
-import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownAggregates, 
SupportsPushDownLimit, SupportsPushDownV2Filters}
+import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.sources.Filter
 
+import java.util.Collections
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -36,7 +42,8 @@ class PaimonScanBuilder(table: InnerTable)
   extends PaimonBaseScanBuilder(table)
   with SupportsPushDownV2Filters
   with SupportsPushDownLimit
-  with SupportsPushDownAggregates {
+  with SupportsPushDownAggregates
+  with SupportsPushDownTopN {
 
   private var localScan: Option[Scan] = None
 
@@ -90,6 +97,63 @@ class PaimonScanBuilder(table: InnerTable)
     false
   }
 
+  override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
+    if (hasPostScanPredicates) {
+      return false
+    }
+
+    if (!table.isInstanceOf[AppendOnlyFileStoreTable]) {
+      return false
+    }
+
+    val coreOptions = CoreOptions.fromMap(table.options())
+    if (coreOptions.deletionVectorsEnabled()) {
+      return false
+    }
+
+    if (orders.length != 1) {
+      return false
+    }
+
+    val order = orders(0)
+    if (!order.expression().isInstanceOf[NamedReference]) {
+      return false
+    }
+
+    val fieldName = orders.head.expression() match {
+      case nr: NamedReference => nr.fieldNames.mkString(".")
+      case _ => return false
+    }
+
+    val rowType = table.rowType()
+    if (rowType.notContainsField(fieldName)) {
+      return false
+    }
+
+    val field = rowType.getField(fieldName)
+    val ref = new FieldRef(field.id(), field.name(), field.`type`())
+
+    val nullOrdering = order.nullOrdering() match {
+      case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
+      case expressions.NullOrdering.NULLS_FIRST => NullOrdering.NULLS_FIRST
+      case _ => return false
+    }
+
+    val direction = order.direction() match {
+      case expressions.SortDirection.DESCENDING => SortDirection.DESCENDING
+      case expressions.SortDirection.ASCENDING => SortDirection.ASCENDING
+      case _ => return false
+    }
+
+    val sort = new SortValue(ref, direction, nullOrdering)
+    pushDownTopN = Some(new TopN(Collections.singletonList(sort), limit))
+
+    // just make the best effort to push down TopN
+    false
+  }
+
+  override def isPartiallyPushed: Boolean = super.isPartiallyPushed
+
   override def supportCompletePushDown(aggregation: Aggregation): Boolean = {
     // for now, we only support complete push down, so there is no difference 
with `pushAggregation`
     pushAggregation(aggregation)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index b3ec74feb4..35190bac51 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -24,8 +24,8 @@ import org.apache.paimon.table.source.DataSplit
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.catalyst.trees.TreePattern.DYNAMIC_PRUNING_SUBQUERY
-import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownLimit}
+import 
org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, 
LIMIT, SORT}
+import org.apache.spark.sql.connector.read.{ScanBuilder, 
SupportsPushDownLimit, SupportsPushDownTopN}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.assertj.core.api.{Assertions => assertj}
 import org.junit.jupiter.api.Assertions
@@ -281,6 +281,68 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Paimon pushDown: topN for append-only tables") {
+    assume(gteqSpark3_3)
+    spark.sql("""
+                |CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED 
BY (pt)
+                |TBLPROPERTIES ('file-index.range-bitmap.columns'='id')
+                |""".stripMargin)
+    Assertions.assertTrue(getScanBuilder().isInstanceOf[SupportsPushDownTopN])
+
+    spark.sql("""
+                |INSERT INTO T VALUES
+                |(1, "a1", "2023"),
+                |(1, "a1", "2023"),
+                |(3, "d1", "2023"),
+                |(4, "e1", "2023")
+                |""".stripMargin)
+    spark.sql("""
+                |INSERT INTO T VALUES
+                |(5, "a2", "2025"),
+                |(NULL, "b2", "2025"),
+                |(6, "c2", "2025"),
+                |(7, "d2", "2025"),
+                |(8, "e2", "2025")
+                |""".stripMargin)
+    spark.sql("""
+                |INSERT INTO T VALUES
+                |(5, "a3", "2023"),
+                |(9, "a3", "2023"),
+                |(2, "c3", "2025"),
+                |(NULL, "b2", "2025")
+                |""".stripMargin)
+
+    // test ASC
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id ASC NULLS LAST LIMIT 3"),
+      Row(1, "a1", "2023") :: Row(1, "a1", "2023") :: Row(2, "c3", "2025") :: 
Nil)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id ASC NULLS FIRST LIMIT 3"),
+      Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(1, "a1", 
"2023") :: Nil)
+
+    // test DESC
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id DESC NULLS LAST LIMIT 3"),
+      Row(9, "a3", "2023") :: Row(8, "e2", "2025") :: Row(7, "d2", "2025") :: 
Nil)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id DESC NULLS FIRST LIMIT 3"),
+      Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(9, "a3", 
"2023") :: Nil)
+
+    // test with partition
+    checkAnswer(
+      spark.sql("SELECT * FROM T WHERE pt='2023' ORDER BY id DESC LIMIT 3"),
+      Row(9, "a3", "2023") :: Row(5, "a3", "2023") :: Row(4, "e1", "2023") :: 
Nil)
+    checkAnswer(
+      spark.sql("SELECT * FROM T WHERE pt='2025' ORDER BY id ASC LIMIT 3"),
+      Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(2, "c3", 
"2025") :: Nil)
+
+    // test plan
+    val df1 = spark.sql("SELECT * FROM T ORDER BY id DESC LIMIT 1")
+    val qe1 = df1.queryExecution
+    Assertions.assertTrue(qe1.optimizedPlan.containsPattern(SORT))
+    Assertions.assertTrue(qe1.optimizedPlan.containsPattern(LIMIT))
+  }
+
   test(s"Paimon pushdown: parquet in-filter") {
     withTable("T") {
       spark.sql(s"""


Reply via email to