This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 283332e9f77b04cea74abcba0cda5d9e9773fd4d
Author: Tan-JiaLiang <[email protected]>
AuthorDate: Wed Aug 27 17:01:52 2025 +0800

    [core] Support TopN pushdown to pick the DataSplits (#6143)
---
 .../bitmap/RangeBitmapIndexPushDownBenchmark.java  |   4 +-
 .../java/org/apache/paimon/predicate/TopN.java     |   9 +
 .../java/org/apache/paimon/utils/ListUtils.java    |   3 +-
 .../rangebitmap/RangeBitmapFileIndexTest.java      |  46 +---
 .../apache/paimon/stats/SimpleStatsEvolution.java  |  26 ++
 .../java/org/apache/paimon/stats/StatsUtils.java   |  51 ++++
 .../paimon/table/AbstractFileStoreTable.java       |   1 +
 .../paimon/table/FallbackReadFileStoreTable.java   |   8 +
 .../paimon/table/source/AbstractDataTableScan.java |   2 +-
 .../org/apache/paimon/table/source/DataSplit.java  |  40 +++
 .../paimon/table/source/DataTableBatchScan.java    |  40 ++-
 .../apache/paimon/table/source/InnerTableScan.java |   5 +
 .../paimon/table/source/ReadBuilderImpl.java       |   3 +
 .../table/source/TopNDataSplitEvaluator.java       | 267 ++++++++++++++++++++
 .../paimon/table/system/ReadOptimizedTable.java    |   1 +
 .../paimon/table/AppendOnlySimpleTableTest.java    |  31 +--
 .../paimon/table/PrimaryKeySimpleTableTest.java    |  20 +-
 .../org/apache/paimon/table/source/SplitTest.java  |  34 +++
 .../apache/paimon/table/source/TableScanTest.java  | 278 +++++++++++++++++++++
 .../table/source/snapshot/ScannerTestBase.java     |  14 ++
 .../paimon/spark/ColumnPruningAndPushDown.scala    |   8 +-
 .../spark/aggregate/AggregatePushDownUtils.scala   |  32 +--
 .../paimon/spark/sql/PaimonPushDownTestBase.scala  |  63 +++--
 23 files changed, 855 insertions(+), 131 deletions(-)

diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
index 77c88afe56..c286b8727d 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RangeBitmapIndexPushDownBenchmark.java
@@ -32,7 +32,6 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
@@ -213,8 +212,7 @@ public class RangeBitmapIndexPushDownBenchmark {
                     () -> {
                         Table table = tables.get(name);
                         FieldRef ref = new FieldRef(0, "k", DataTypes.INT());
-                        SortValue sort = new SortValue(ref, DESCENDING, 
NULLS_LAST);
-                        TopN topN = new TopN(Collections.singletonList(sort), 
k);
+                        TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
                         List<Split> splits = 
table.newReadBuilder().newScan().plan().splits();
                         AtomicLong readCount = new AtomicLong(0);
                         try {
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
index 2ecb38cce3..e66de2c560 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
@@ -18,9 +18,12 @@
 
 package org.apache.paimon.predicate;
 
+import org.apache.paimon.predicate.SortValue.NullOrdering;
+import org.apache.paimon.predicate.SortValue.SortDirection;
 import org.apache.paimon.utils.Preconditions;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -37,6 +40,12 @@ public class TopN implements Serializable {
         this.limit = limit;
     }
 
+    public TopN(FieldRef ref, SortDirection direction, NullOrdering 
nullOrdering, int limit) {
+        SortValue order = new SortValue(ref, direction, nullOrdering);
+        this.orders = Collections.singletonList(order);
+        this.limit = limit;
+    }
+
     public List<SortValue> orders() {
         return orders;
     }
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
index e870b68e1d..a93f9d016c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ListUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.utils;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -32,7 +33,7 @@ public class ListUtils {
         return list.get(index);
     }
 
-    public static <T> boolean isNullOrEmpty(List<T> list) {
+    public static <T> boolean isNullOrEmpty(Collection<T> list) {
         return list == null || list.isEmpty();
     }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
index 904f24826f..90656c2081 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/rangebitmap/RangeBitmapFileIndexTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
 import org.apache.paimon.fs.ByteArraySeekableStream;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.VarCharType;
@@ -36,7 +35,6 @@ import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -283,11 +281,7 @@ public class RangeBitmapFileIndexTest {
             RoaringBitmap32 expected = new RoaringBitmap32();
 
             // test NULL_LAST without found set
-            TopN topN =
-                    new TopN(
-                            Collections.singletonList(
-                                    new SortValue(fieldRef, ASCENDING, 
NULLS_LAST)),
-                            k);
+            TopN topN = new TopN(fieldRef, ASCENDING, NULLS_LAST, k);
             pairs.stream()
                     .sorted(nullLastCompactor)
                     .limit(k)
@@ -298,11 +292,7 @@ public class RangeBitmapFileIndexTest {
 
             // test NULL_LAST with found set
             expected.clear();
-            topN =
-                    new TopN(
-                            Collections.singletonList(
-                                    new SortValue(fieldRef, ASCENDING, 
NULLS_LAST)),
-                            k);
+            topN = new TopN(fieldRef, ASCENDING, NULLS_LAST, k);
             pairs.stream()
                     .filter(pair -> foundSet.contains(pair.getKey()))
                     .sorted(nullLastCompactor)
@@ -317,11 +307,7 @@ public class RangeBitmapFileIndexTest {
 
             // test NULL_FIRST without found set
             expected.clear();
-            topN =
-                    new TopN(
-                            Collections.singletonList(
-                                    new SortValue(fieldRef, ASCENDING, 
NULLS_FIRST)),
-                            k);
+            topN = new TopN(fieldRef, ASCENDING, NULLS_FIRST, k);
             pairs.stream()
                     .sorted(nullFirstCompactor)
                     .limit(k)
@@ -332,11 +318,7 @@ public class RangeBitmapFileIndexTest {
 
             // test NULL_FIRST with found set
             expected.clear();
-            topN =
-                    new TopN(
-                            Collections.singletonList(
-                                    new SortValue(fieldRef, ASCENDING, 
NULLS_FIRST)),
-                            k);
+            topN = new TopN(fieldRef, ASCENDING, NULLS_FIRST, k);
             pairs.stream()
                     .filter(pair -> foundSet.contains(pair.getKey()))
                     .sorted(nullFirstCompactor)
@@ -406,38 +388,26 @@ public class RangeBitmapFileIndexTest {
         // test bottomK
         BitmapIndexResult result =
                 new BitmapIndexResult(() -> RoaringBitmap32.bitmapOf(0, 3, 4, 
5));
-        TopN bottomNullFirst =
-                new TopN(
-                        Collections.singletonList(new SortValue(fieldRef, 
ASCENDING, NULLS_FIRST)),
-                        3);
+        TopN bottomNullFirst = new TopN(fieldRef, ASCENDING, NULLS_FIRST, 3);
         assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullFirst, 
null)).get())
                 .isEqualTo(RoaringBitmap32.bitmapOf(0, 5, 6));
         assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullFirst, 
result)).get())
                 .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 5));
 
-        TopN bottomNullLast =
-                new TopN(
-                        Collections.singletonList(new SortValue(fieldRef, 
ASCENDING, NULLS_LAST)),
-                        3);
+        TopN bottomNullLast = new TopN(fieldRef, ASCENDING, NULLS_LAST, 3);
         assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullLast, 
null)).get())
                 .isEqualTo(RoaringBitmap32.bitmapOf(0, 1, 2));
         assertThat(((BitmapIndexResult) reader.visitTopN(bottomNullLast, 
result)).get())
                 .isEqualTo(RoaringBitmap32.bitmapOf(0, 3, 4));
 
         // test topK
-        TopN topNullFirst =
-                new TopN(
-                        Collections.singletonList(new SortValue(fieldRef, 
DESCENDING, NULLS_FIRST)),
-                        3);
+        TopN topNullFirst = new TopN(fieldRef, DESCENDING, NULLS_FIRST, 3);
         assertThat(((BitmapIndexResult) reader.visitTopN(topNullFirst, 
null)).get())
                 .isEqualTo(RoaringBitmap32.bitmapOf(5, 6, 7));
         assertThat(((BitmapIndexResult) reader.visitTopN(topNullFirst, 
result)).get())
                 .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 5));
 
-        TopN topNullLast =
-                new TopN(
-                        Collections.singletonList(new SortValue(fieldRef, 
DESCENDING, NULLS_LAST)),
-                        3);
+        TopN topNullLast = new TopN(fieldRef, DESCENDING, NULLS_LAST, 3);
         assertThat(((BitmapIndexResult) reader.visitTopN(topNullLast, 
null)).get())
                 .isEqualTo(RoaringBitmap32.bitmapOf(3, 4, 7));
         assertThat(((BitmapIndexResult) reader.visitTopN(topNullLast, 
result)).get())
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
index b1c7cfebee..ad9f4f4b22 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolution.java
@@ -88,6 +88,32 @@ public class SimpleStatsEvolution {
         return result;
     }
 
+    public InternalArray evolution(
+            InternalArray array, Long rowCount, @Nullable List<String> 
denseFields) {
+        InternalArray nullCounts = array;
+
+        if (denseFields != null && denseFields.isEmpty()) {
+            // optimize for empty dense fields
+            nullCounts = emptyNullCounts;
+        } else if (denseFields != null) {
+            int[] denseIndexMapping =
+                    indexMappings.computeIfAbsent(
+                            denseFields,
+                            k -> 
fieldNames.stream().mapToInt(denseFields::indexOf).toArray());
+            nullCounts = 
ProjectedArray.from(denseIndexMapping).replaceArray(nullCounts);
+        }
+
+        if (indexMapping != null) {
+            if (rowCount == null) {
+                throw new RuntimeException("Schema Evolution for stats needs 
row count.");
+            }
+
+            nullCounts = new NullCountsEvoArray(indexMapping, nullCounts, 
rowCount);
+        }
+
+        return nullCounts;
+    }
+
     public Result evolution(
             SimpleStats stats, @Nullable Long rowCount, @Nullable List<String> 
denseFields) {
         InternalRow minValues = stats.minValues();
diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java
new file mode 100644
index 0000000000..ec52cda0d9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/stats/StatsUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.stats;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TinyIntType;
+
+/** Utils for Stats. */
+public class StatsUtils {
+
+    public static boolean minmaxAvailable(DataType type) {
+        // not push down complex type
+        // not push down Timestamp because INT96 sort order is undefined,
+        // Parquet doesn't return statistics for INT96
+        // not push down Parquet Binary because min/max could be truncated
+        // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
+        // could be Spark StringType, BinaryType or DecimalType.
+        // not push down for ORC with same reason.
+        return type instanceof BooleanType
+                || type instanceof TinyIntType
+                || type instanceof SmallIntType
+                || type instanceof IntType
+                || type instanceof BigIntType
+                || type instanceof FloatType
+                || type instanceof DoubleType
+                || type instanceof DateType;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 6757c8ef38..db0ca5e5ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -268,6 +268,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     public DataTableBatchScan newScan() {
         return new DataTableBatchScan(
                 tableSchema,
+                schemaManager(),
                 coreOptions(),
                 newSnapshotReader(),
                 catalogEnvironment.tableQueryAuth(coreOptions()));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index f53cfaa985..11bc764a57 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.DataFilePlan;
@@ -366,6 +367,13 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             return this;
         }
 
+        @Override
+        public InnerTableScan withTopN(TopN topN) {
+            mainScan.withTopN(topN);
+            fallbackScan.withTopN(topN);
+            return this;
+        }
+
         @Override
         public InnerTableScan dropStats() {
             mainScan.dropStats();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index b019f5dbda..67acb44066 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -78,7 +78,7 @@ abstract class AbstractDataTableScan implements DataTableScan 
{
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDataTableScan.class);
 
-    private final TableSchema schema;
+    protected final TableSchema schema;
     private final CoreOptions options;
     protected final SnapshotReader snapshotReader;
     private final TableQueryAuth queryAuth;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 1feb72afa3..276cf5fe50 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMeta08Serializer;
@@ -34,6 +35,7 @@ import org.apache.paimon.predicate.CompareUtils;
 import org.apache.paimon.stats.SimpleStatsEvolution;
 import org.apache.paimon.stats.SimpleStatsEvolutions;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.FunctionWithIOException;
 import org.apache.paimon.utils.InternalRowUtils;
 import org.apache.paimon.utils.SerializationUtils;
@@ -44,13 +46,16 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.io.DataFilePathFactory.INDEX_PATH_SUFFIX;
+import static org.apache.paimon.utils.ListUtils.isNullOrEmpty;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -153,6 +158,21 @@ public class DataSplit implements Split {
         return partialMergedRowCount();
     }
 
+    public boolean statsAvailable(Set<String> columns) {
+        if (isNullOrEmpty(columns)) {
+            return false;
+        }
+
+        return dataFiles.stream()
+                .map(DataFileMeta::valueStatsCols)
+                .allMatch(
+                        valueStatsCols ->
+                                // It means there are all column statistics 
when valueStatsCols ==
+                                // null
+                                valueStatsCols == null
+                                        || new 
HashSet<>(valueStatsCols).containsAll(columns));
+    }
+
     public Object minValue(int fieldIndex, DataField dataField, 
SimpleStatsEvolutions evolutions) {
         Object minValue = null;
         for (DataFileMeta dataFile : dataFiles) {
@@ -191,6 +211,26 @@ public class DataSplit implements Split {
         return maxValue;
     }
 
+    public Long nullCount(int fieldIndex, SimpleStatsEvolutions evolutions) {
+        Long sum = null;
+        for (DataFileMeta dataFile : dataFiles) {
+            SimpleStatsEvolution evolution = 
evolutions.getOrCreate(dataFile.schemaId());
+            InternalArray nullCounts =
+                    evolution.evolution(
+                            dataFile.valueStats().nullCounts(),
+                            dataFile.rowCount(),
+                            dataFile.valueStatsCols());
+            Long nullCount =
+                    (Long) InternalRowUtils.get(nullCounts, fieldIndex, 
DataTypes.BIGINT());
+            if (sum == null) {
+                sum = nullCount;
+            } else if (nullCount != null) {
+                sum += nullCount;
+            }
+        }
+        return sum;
+    }
+
     /**
      * Obtain merged row count as much as possible. There are two scenarios 
where accurate row count
      * can be calculated:
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index d7f02c06ad..ec8eb8ee0b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -37,16 +39,20 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
     private boolean hasNext;
 
     private Integer pushDownLimit;
+    private TopN topN;
+
+    private final SchemaManager schemaManager;
 
     public DataTableBatchScan(
             TableSchema schema,
+            SchemaManager schemaManager,
             CoreOptions options,
             SnapshotReader snapshotReader,
             TableQueryAuth queryAuth) {
         super(schema, options, snapshotReader, queryAuth);
 
         this.hasNext = true;
-
+        this.schemaManager = schemaManager;
         if (!schema.primaryKeys().isEmpty() && options.batchScanSkipLevel0()) {
             if (options.toConfiguration()
                     .get(CoreOptions.BATCH_SCAN_MODE)
@@ -71,6 +77,12 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
         return this;
     }
 
+    @Override
+    public InnerTableScan withTopN(TopN topN) {
+        this.topN = topN;
+        return this;
+    }
+
     @Override
     public TableScan.Plan plan() {
         authQuery();
@@ -82,8 +94,9 @@ public class DataTableBatchScan extends AbstractDataTableScan 
{
         if (hasNext) {
             hasNext = false;
             StartingScanner.Result result = 
startingScanner.scan(snapshotReader);
-            StartingScanner.Result limitedResult = applyPushDownLimit(result);
-            return DataFilePlan.fromResult(limitedResult);
+            result = applyPushDownLimit(result);
+            result = applyPushDownTopN(result);
+            return DataFilePlan.fromResult(result);
         } else {
             throw new EndOfScanException();
         }
@@ -123,6 +136,27 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
         return result;
     }
 
+    private StartingScanner.Result applyPushDownTopN(StartingScanner.Result 
result) {
+        if (topN == null
+                || pushDownLimit != null
+                || !(result instanceof ScannedResult)
+                || !schema.primaryKeys().isEmpty()
+                || options().deletionVectorsEnabled()) {
+            return result;
+        }
+
+        SnapshotReader.Plan plan = ((ScannedResult) result).plan();
+        List<DataSplit> splits = plan.dataSplits();
+        if (splits.isEmpty()) {
+            return result;
+        }
+
+        TopNDataSplitEvaluator evaluator = new TopNDataSplitEvaluator(schema, 
schemaManager);
+        List<Split> topNSplits = new ArrayList<>(evaluator.evaluate(topN, 
splits));
+        SnapshotReader.Plan newPlan = new PlanImpl(plan.watermark(), 
plan.snapshotId(), topNSplits);
+        return new ScannedResult(newPlan);
+    }
+
     @Override
     public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
         snapshotReader.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 89f15055d5..d0015445b3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 
@@ -77,6 +78,10 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
+    default InnerTableScan withTopN(TopN topN) {
+        return this;
+    }
+
     default InnerTableScan dropStats() {
         // do nothing, should implement this if need
         return this;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index e57a7b6382..662ac19858 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -167,6 +167,9 @@ public class ReadBuilderImpl implements ReadBuilder {
         if (limit != null) {
             tableScan.withLimit(limit);
         }
+        if (topN != null) {
+            tableScan.withTopN(topN);
+        }
         return tableScan;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
new file mode 100644
index 0000000000..87c6b23cfd
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source;
+
+import org.apache.paimon.predicate.CompareUtils;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.SortValue;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStatsEvolutions;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
+import static org.apache.paimon.stats.StatsUtils.minmaxAvailable;
+
+/** Evaluate DataSplit TopN result. */
+public class TopNDataSplitEvaluator {
+
+    private final Map<Long, TableSchema> tableSchemas;
+    private final TableSchema schema;
+    private final SchemaManager schemaManager;
+
+    public TopNDataSplitEvaluator(TableSchema schema, SchemaManager 
schemaManager) {
+        this.tableSchemas = new HashMap<>();
+        this.schema = schema;
+        this.schemaManager = schemaManager;
+    }
+
+    public List<DataSplit> evaluate(TopN topN, List<DataSplit> splits) {
+        // todo: we can support all the sort columns.
+        List<SortValue> orders = topN.orders();
+        if (orders.size() != 1) {
+            return splits;
+        }
+
+        int limit = topN.limit();
+        if (limit >= splits.size()) {
+            return splits;
+        }
+
+        SortValue order = orders.get(0);
+        DataType type = order.field().type();
+        if (!minmaxAvailable(type)) {
+            return splits;
+        }
+
+        return getTopNSplits(order, limit, splits);
+    }
+
+    private List<DataSplit> getTopNSplits(SortValue order, int limit, 
List<DataSplit> splits) {
+        FieldRef ref = order.field();
+        SortValue.SortDirection direction = order.direction();
+        SortValue.NullOrdering nullOrdering = order.nullOrdering();
+
+        int index = ref.index();
+        DataField field = schema.fields().get(index);
+        SimpleStatsEvolutions evolutions =
+                new SimpleStatsEvolutions((id) -> 
scanTableSchema(id).fields(), schema.id());
+
+        // extract the stats
+        List<DataSplit> results = new ArrayList<>();
+        List<Pair<Stats, DataSplit>> pairs = new ArrayList<>();
+        for (DataSplit split : splits) {
+            if (!split.rawConvertible()) {
+                return splits;
+            }
+
+            Set<String> cols = Collections.singleton(field.name());
+            if (!split.statsAvailable(cols)) {
+                results.add(split);
+                continue;
+            }
+
+            Object min = split.minValue(index, field, evolutions);
+            Object max = split.maxValue(index, field, evolutions);
+            Long nullCount = split.nullCount(index, evolutions);
+            Stats stats = new Stats(min, max, nullCount);
+            pairs.add(Pair.of(stats, split));
+        }
+
+        // pick the TopN splits
+        if (NULLS_FIRST.equals(nullOrdering)) {
+            results.addAll(pickNullFirstSplits(pairs, ref, direction, limit));
+        } else if (NULLS_LAST.equals(nullOrdering)) {
+            results.addAll(pickNullLastSplits(pairs, ref, direction, limit));
+        } else {
+            return splits;
+        }
+
+        return results;
+    }
+
+    private List<DataSplit> pickNullFirstSplits(
+            List<Pair<Stats, DataSplit>> pairs,
+            FieldRef field,
+            SortValue.SortDirection direction,
+            int limit) {
+        Comparator<Pair<Stats, DataSplit>> comparator;
+        if (ASCENDING.equals(direction)) {
+            comparator =
+                    (x, y) -> {
+                        Stats left = x.getKey();
+                        Stats right = y.getKey();
+                        int result = nullsFirst(left.nullCount, 
right.nullCount);
+                        if (result == 0) {
+                            result = asc(field, left.min, right.min);
+                        }
+                        return result;
+                    };
+        } else if (DESCENDING.equals(direction)) {
+            comparator =
+                    (x, y) -> {
+                        Stats left = x.getKey();
+                        Stats right = y.getKey();
+                        int result = nullsFirst(left.nullCount, 
right.nullCount);
+                        if (result == 0) {
+                            result = desc(field, left.max, right.max);
+                        }
+                        return result;
+                    };
+        } else {
+            return 
pairs.stream().map(Pair::getValue).collect(Collectors.toList());
+        }
+        pairs.sort(comparator);
+
+        long scanned = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        for (Pair<Stats, DataSplit> pair : pairs) {
+            Stats stats = pair.getKey();
+            DataSplit split = pair.getValue();
+            splits.add(split);
+            scanned += Math.max(stats.nullCount, 1);
+            if (scanned >= limit) {
+                break;
+            }
+        }
+        return splits;
+    }
+
+    private List<DataSplit> pickNullLastSplits(
+            List<Pair<Stats, DataSplit>> pairs,
+            FieldRef field,
+            SortValue.SortDirection direction,
+            int limit) {
+        Comparator<Pair<Stats, DataSplit>> comparator;
+        if (ASCENDING.equals(direction)) {
+            comparator =
+                    (x, y) -> {
+                        Stats left = x.getKey();
+                        Stats right = y.getKey();
+                        int result = asc(field, left.min, right.min);
+                        if (result == 0) {
+                            result = nullsLast(left.nullCount, 
right.nullCount);
+                        }
+                        return result;
+                    };
+        } else if (DESCENDING.equals(direction)) {
+            comparator =
+                    (x, y) -> {
+                        Stats left = x.getKey();
+                        Stats right = y.getKey();
+                        int result = desc(field, left.max, right.max);
+                        if (result == 0) {
+                            result = nullsLast(left.nullCount, 
right.nullCount);
+                        }
+                        return result;
+                    };
+        } else {
+            return 
pairs.stream().map(Pair::getValue).collect(Collectors.toList());
+        }
+
+        return pairs.stream()
+                .sorted(comparator)
+                .map(Pair::getValue)
+                .limit(limit)
+                .collect(Collectors.toList());
+    }
+
+    private int nullsFirst(Long left, Long right) {
+        if (left == null) {
+            return -1;
+        } else if (right == null) {
+            return 1;
+        } else {
+            return -Long.compare(left, right);
+        }
+    }
+
+    private int nullsLast(Long left, Long right) {
+        if (left == null) {
+            return -1;
+        } else if (right == null) {
+            return 1;
+        } else {
+            return Long.compare(left, right);
+        }
+    }
+
+    private int asc(FieldRef field, Object left, Object right) {
+        if (left == null) {
+            return -1;
+        } else if (right == null) {
+            return 1;
+        } else {
+            return CompareUtils.compareLiteral(field.type(), left, right);
+        }
+    }
+
+    private int desc(FieldRef field, Object left, Object right) {
+        if (left == null) {
+            return -1;
+        } else if (right == null) {
+            return 1;
+        } else {
+            return -CompareUtils.compareLiteral(field.type(), left, right);
+        }
+    }
+
+    private TableSchema scanTableSchema(long id) {
+        return tableSchemas.computeIfAbsent(
+                id, key -> key == schema.id() ? schema : 
schemaManager.schema(id));
+    }
+
+    /** The DataSplit's stats. */
+    private static class Stats {
+        Object min;
+        Object max;
+        Long nullCount;
+
+        public Stats(Object min, Object max, Long nullCount) {
+            this.min = min;
+            this.max = max;
+            this.nullCount = nullCount;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 0094313238..87d79bc537 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -149,6 +149,7 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
         CoreOptions options = wrapped.coreOptions();
         return new DataTableBatchScan(
                 wrapped.schema(),
+                schemaManager(),
                 options,
                 newSnapshotReader(wrapped),
                 wrapped.catalogEnvironment().tableQueryAuth(options));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index f8056cc492..cf246b13b5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -43,7 +43,6 @@ import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
@@ -101,6 +100,8 @@ import static 
org.apache.paimon.CoreOptions.METADATA_STATS_MODE;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static org.apache.paimon.CoreOptions.WRITE_ONLY;
 import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -882,13 +883,9 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
             RoaringBitmap32 bitmap = new RoaringBitmap32();
             expected.forEach(bitmap::add);
             DataField field = rowType.getField("price");
-            SortValue sort =
-                    new SortValue(
-                            new FieldRef(field.id(), field.name(), 
field.type()),
-                            SortValue.SortDirection.DESCENDING,
-                            SortValue.NullOrdering.NULLS_LAST);
-            TopN topN = new TopN(Collections.singletonList(sort), k);
-            TableScan.Plan plan = table.newScan().plan();
+            FieldRef ref = new FieldRef(field.id(), field.name(), 
field.type());
+            TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
+            TableScan.Plan plan = table.newScan().withTopN(topN).plan();
             RecordReader<InternalRow> reader =
                     table.newRead().withTopN(topN).createReader(plan.splits());
             AtomicInteger cnt = new AtomicInteger(0);
@@ -906,13 +903,9 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
         // test TopK without index
         {
             DataField field = rowType.getField("id");
-            SortValue sort =
-                    new SortValue(
-                            new FieldRef(field.id(), field.name(), 
field.type()),
-                            SortValue.SortDirection.DESCENDING,
-                            SortValue.NullOrdering.NULLS_LAST);
-            TopN topN = new TopN(Collections.singletonList(sort), k);
-            TableScan.Plan plan = table.newScan().plan();
+            FieldRef ref = new FieldRef(field.id(), field.name(), 
field.type());
+            TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
+            TableScan.Plan plan = table.newScan().withTopN(topN).plan();
             RecordReader<InternalRow> reader =
                     table.newRead().withTopN(topN).createReader(plan.splits());
             AtomicInteger cnt = new AtomicInteger(0);
@@ -933,12 +926,8 @@ public class AppendOnlySimpleTableTest extends 
SimpleTableTestBase {
                             .build();
             table = createUnawareBucketFileStoreTable(rowType, configure);
             DataField field = rowType.getField("price");
-            SortValue sort =
-                    new SortValue(
-                            new FieldRef(field.id(), field.name(), 
field.type()),
-                            SortValue.SortDirection.DESCENDING,
-                            SortValue.NullOrdering.NULLS_LAST);
-            TopN topN = new TopN(Collections.singletonList(sort), k);
+            FieldRef ref = new FieldRef(field.id(), field.name(), 
field.type());
+            TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
             TableScan.Plan plan = table.newScan().plan();
             RecordReader<InternalRow> reader =
                     table.newRead().withTopN(topN).createReader(plan.splits());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index d4f1484230..ec0675a53a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -44,7 +44,6 @@ import org.apache.paimon.postpone.PostponeBucketWriter;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.SortValue;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
@@ -142,6 +141,9 @@ import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
 import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.apache.paimon.predicate.PredicateBuilder.and;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -1256,12 +1258,8 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
             int k = new Random().nextInt(100);
             RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(min, min + 
k);
             DataField field = 
table.schema().nameToFieldMap().get(indexColumnName);
-            SortValue sort =
-                    new SortValue(
-                            new FieldRef(field.id(), field.name(), 
field.type()),
-                            SortValue.SortDirection.ASCENDING,
-                            SortValue.NullOrdering.NULLS_LAST);
-            TopN topN = new TopN(Collections.singletonList(sort), k);
+            FieldRef ref = new FieldRef(field.id(), field.name(), 
field.type());
+            TopN topN = new TopN(ref, ASCENDING, NULLS_LAST, k);
             TableScan.Plan plan = table.newScan().plan();
             RecordReader<InternalRow> reader =
                     table.newRead().withTopN(topN).createReader(plan.splits());
@@ -1282,12 +1280,8 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
             int k = new Random().nextInt(100);
             RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOfRange(max - k, 
max);
             DataField field = 
table.schema().nameToFieldMap().get(indexColumnName);
-            SortValue sort =
-                    new SortValue(
-                            new FieldRef(field.id(), field.name(), 
field.type()),
-                            SortValue.SortDirection.DESCENDING,
-                            SortValue.NullOrdering.NULLS_LAST);
-            TopN topN = new TopN(Collections.singletonList(sort), k);
+            FieldRef ref = new FieldRef(field.id(), field.name(), 
field.type());
+            TopN topN = new TopN(ref, DESCENDING, NULLS_LAST, k);
             TableScan.Plan plan = table.newScan().plan();
             RecordReader<InternalRow> reader =
                     table.newRead().withTopN(topN).createReader(plan.splits());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index 6eb11583f4..4d6d842f75 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -160,6 +160,40 @@ public class SplitTest {
         assertThat(split2.maxValue(3, floatField, evolutions)).isEqualTo(null);
     }
 
+    @Test
+    public void testSplitNullCount() {
+        Map<Long, List<DataField>> schemas = new HashMap<>();
+        DataField intField = new DataField(0, "c_int", new IntType());
+        DataField longField = new DataField(1, "c_long", new BigIntType());
+        schemas.put(1L, Arrays.asList(intField, longField));
+
+        // test common
+        BinaryRow min1 = newBinaryRow(new Object[] {10, 123L});
+        BinaryRow max1 = newBinaryRow(new Object[] {99, 456L});
+        SimpleStats valueStats1 = new SimpleStats(min1, max1, 
fromLongArray(new Long[] {5L, 1L}));
+
+        BinaryRow min2 = newBinaryRow(new Object[] {5, 0L});
+        BinaryRow max2 = newBinaryRow(new Object[] {90, 789L});
+        SimpleStats valueStats2 = new SimpleStats(min2, max2, 
fromLongArray(new Long[] {3L, 2L}));
+
+        DataFileMeta d1 = newDataFile(100, valueStats1, null);
+        DataFileMeta d2 = newDataFile(100, valueStats2, null);
+        DataSplit split1 = newDataSplit(true, Arrays.asList(d1, d2), null);
+
+        SimpleStatsEvolutions evolutions = new 
SimpleStatsEvolutions(schemas::get, 1);
+        assertThat(split1.nullCount(0, evolutions)).isEqualTo(8);
+        assertThat(split1.nullCount(1, evolutions)).isEqualTo(3);
+
+        // test schema evolution
+        DataField doubleField = new DataField(2, "c_double", new DoubleType());
+        schemas.put(2L, Arrays.asList(intField, longField, doubleField));
+        evolutions = new SimpleStatsEvolutions(schemas::get, 2);
+
+        assertThat(split1.nullCount(0, evolutions)).isEqualTo(8);
+        assertThat(split1.nullCount(1, evolutions)).isEqualTo(3);
+        assertThat(split1.nullCount(2, evolutions)).isEqualTo(200);
+    }
+
     @Test
     public void testSerializer() throws IOException {
         DataFileTestDataGenerator gen = 
DataFileTestDataGenerator.builder().build();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
index 5f565339f9..0fd381f918 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
@@ -18,16 +18,28 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.TopN;
+import org.apache.paimon.stats.SimpleStatsEvolutions;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.snapshot.ScannerTestBase;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_FIRST;
+import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
+import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
+import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -98,4 +110,270 @@ public class TableScanTest extends ScannerTestBase {
         write.close();
         commit.close();
     }
+
+    @Test
+    public void testPushDownTopN() throws Exception {
+        createAppendOnlyTable();
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(3, 30, 300L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(4, 40, 400L));
+        write.write(rowData(5, 50, 500L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(rowData(6, null, 600L));
+        write.write(rowData(6, null, 600L));
+        write.write(rowData(6, 60, 600L));
+        write.write(rowData(7, null, 700L));
+        write.write(rowData(7, null, 700L));
+        write.write(rowData(7, 70, 700L));
+        commit.commit(2, write.prepareCommit(true, 2));
+        write.close();
+        commit.close();
+
+        // disable stats
+        FileStoreTable disableStatsTable =
+                table.copy(Collections.singletonMap("metadata.stats-mode", 
"none"));
+        write = disableStatsTable.newWrite(commitUser);
+        commit = disableStatsTable.newCommit(commitUser);
+
+        write.write(rowData(8, 80, 800L));
+        write.write(rowData(9, 90, 900L));
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        // no TopN pushed down
+        TableScan.Plan plan1 = table.newScan().plan();
+        assertThat(plan1.splits().size()).isEqualTo(9);
+
+        DataField field = table.schema().fields().get(1);
+        FieldRef ref = new FieldRef(field.id(), field.name(), field.type());
+        SimpleStatsEvolutions evolutions =
+                new SimpleStatsEvolutions(
+                        (id) -> table.schemaManager().schema(id).fields(), 
table.schema().id());
+
+        // with bottom1 null first
+        TableScan.Plan plan2 =
+                table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST, 
1)).plan();
+        List<Split> splits2 = plan2.splits();
+        assertThat(splits2.size()).isEqualTo(3);
+        assertThat(((DataSplit) splits2.get(0)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits2.get(1)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits2.get(2)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(60);
+        assertThat(((DataSplit) splits2.get(2)).nullCount(field.id(), 
evolutions)).isEqualTo(2);
+
+        // with bottom1 null last
+        TableScan.Plan plan3 =
+                table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_LAST, 
1)).plan();
+        List<Split> splits3 = plan3.splits();
+        assertThat(splits3.size()).isEqualTo(3);
+        assertThat(((DataSplit) splits3.get(0)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits3.get(1)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits3.get(2)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(10);
+
+        // with bottom5 null first
+        TableScan.Plan plan4 =
+                table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST, 
5)).plan();
+        List<Split> splits4 = plan4.splits();
+        assertThat(splits4.size()).isEqualTo(5);
+        assertThat(((DataSplit) splits4.get(0)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits4.get(1)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits4.get(2)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(60);
+        assertThat(((DataSplit) splits4.get(2)).nullCount(field.id(), 
evolutions)).isEqualTo(2);
+        assertThat(((DataSplit) splits4.get(3)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(70);
+        assertThat(((DataSplit) splits4.get(3)).nullCount(field.id(), 
evolutions)).isEqualTo(2);
+        assertThat(((DataSplit) splits4.get(4)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(10);
+
+        // with bottom5 null last
+        TableScan.Plan plan5 =
+                table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_LAST, 
5)).plan();
+        List<Split> splits5 = plan5.splits();
+        assertThat(splits5.size()).isEqualTo(7);
+        assertThat(((DataSplit) splits5.get(0)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits5.get(1)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits5.get(2)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(10);
+        assertThat(((DataSplit) splits5.get(3)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(20);
+        assertThat(((DataSplit) splits5.get(4)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(30);
+        assertThat(((DataSplit) splits5.get(5)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(40);
+        assertThat(((DataSplit) splits5.get(6)).minValue(field.id(), field, 
evolutions))
+                .isEqualTo(50);
+
+        // with top1 null first
+        TableScan.Plan plan6 =
+                table.newScan().withTopN(new TopN(ref, DESCENDING, 
NULLS_FIRST, 1)).plan();
+        List<Split> splits6 = plan6.splits();
+        assertThat(splits6.size()).isEqualTo(3);
+        assertThat(((DataSplit) splits6.get(0)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits6.get(1)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits6.get(2)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(70);
+        assertThat(((DataSplit) splits6.get(2)).nullCount(field.id(), 
evolutions)).isEqualTo(2);
+
+        // with top1 null last
+        TableScan.Plan plan7 =
+                table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 
1)).plan();
+        List<Split> splits7 = plan7.splits();
+        assertThat(splits7.size()).isEqualTo(3);
+        assertThat(((DataSplit) splits7.get(0)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits7.get(1)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits7.get(2)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(70);
+
+        // with top5 null first
+        TableScan.Plan plan8 =
+                table.newScan().withTopN(new TopN(ref, DESCENDING, 
NULLS_FIRST, 5)).plan();
+        List<Split> splits8 = plan8.splits();
+        assertThat(splits8.size()).isEqualTo(5);
+        assertThat(((DataSplit) splits8.get(0)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits8.get(1)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits8.get(2)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(70);
+        assertThat(((DataSplit) splits8.get(2)).nullCount(field.id(), 
evolutions)).isEqualTo(2);
+        assertThat(((DataSplit) splits8.get(3)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(60);
+        assertThat(((DataSplit) splits8.get(3)).nullCount(field.id(), 
evolutions)).isEqualTo(2);
+        assertThat(((DataSplit) splits8.get(4)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(50);
+
+        // with top5 null last
+        TableScan.Plan plan9 =
+                table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 
5)).plan();
+        List<Split> splits9 = plan9.splits();
+        assertThat(splits9.size()).isEqualTo(7);
+        assertThat(((DataSplit) splits9.get(0)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits9.get(1)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(null);
+        assertThat(((DataSplit) splits9.get(2)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(70);
+        assertThat(((DataSplit) splits9.get(3)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(60);
+        assertThat(((DataSplit) splits9.get(4)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(50);
+        assertThat(((DataSplit) splits9.get(5)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(40);
+        assertThat(((DataSplit) splits9.get(6)).maxValue(field.id(), field, 
evolutions))
+                .isEqualTo(30);
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testPushDownTopNSchemaEvolution() throws Exception {
+        createAppendOnlyTable();
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(3, 30, 300L));
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
+
+        // schema evolution
+        updateColumn("a", DataTypes.BIGINT());
+        write = table.newWrite(commitUser);
+        commit = table.newCommit(commitUser);
+        write.write(rowData(4, 40L, 400L));
+        write.write(rowData(5, 50L, 500L));
+        write.write(rowData(6, 60L, 600L));
+        commit.commit(1, write.prepareCommit(true, 1));
+        write.close();
+        commit.close();
+
+        DataField field = table.schema().fields().get(1);
+        FieldRef ref = new FieldRef(field.id(), field.name(), field.type());
+        SimpleStatsEvolutions evolutions =
+                new SimpleStatsEvolutions(
+                        (id) -> table.schemaManager().schema(id).fields(), 
table.schema().id());
+
+        TableScan.Plan plan1 =
+                table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 
1)).plan();
+        assertThat(plan1.splits().size()).isEqualTo(1);
+        assertThat(((DataSplit) plan1.splits().get(0)).maxValue(field.id(), 
field, evolutions))
+                .isEqualTo(60L);
+        assertThat(((DataSplit) plan1.splits().get(0)).minValue(field.id(), 
field, evolutions))
+                .isEqualTo(60L);
+
+        TableScan.Plan plan2 =
+                table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST, 
1)).plan();
+        assertThat(plan2.splits().size()).isEqualTo(1);
+        assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(), 
field, evolutions))
+                .isEqualTo(10L);
+        assertThat(((DataSplit) plan2.splits().get(0)).minValue(field.id(), 
field, evolutions))
+                .isEqualTo(10L);
+    }
+
+    @Test
+    public void testPushDownTopNOnlyNull() throws Exception {
+        createAppendOnlyTable();
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, null, 100L));
+        write.write(rowData(2, null, 200L));
+        write.write(rowData(3, null, 300L));
+        commit.commit(0, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
+
+        DataField field = table.schema().fields().get(1);
+        FieldRef ref = new FieldRef(field.id(), field.name(), field.type());
+        SimpleStatsEvolutions evolutions =
+                new SimpleStatsEvolutions(
+                        (id) -> table.schemaManager().schema(id).fields(), 
table.schema().id());
+
+        // the min/max will be null, and the null count is not null.
+        TableScan.Plan plan1 =
+                table.newScan().withTopN(new TopN(ref, DESCENDING, NULLS_LAST, 
1)).plan();
+        assertThat(plan1.splits().size()).isEqualTo(1);
+        assertThat(((DataSplit) plan1.splits().get(0)).nullCount(field.id(), 
evolutions))
+                .isEqualTo(1);
+        assertThat(((DataSplit) plan1.splits().get(0)).minValue(field.id(), 
field, evolutions))
+                .isNull();
+        assertThat(((DataSplit) plan1.splits().get(0)).maxValue(field.id(), 
field, evolutions))
+                .isNull();
+
+        TableScan.Plan plan2 =
+                table.newScan().withTopN(new TopN(ref, ASCENDING, NULLS_FIRST, 
1)).plan();
+        assertThat(plan2.splits().size()).isEqualTo(1);
+        assertThat(((DataSplit) plan2.splits().get(0)).nullCount(field.id(), 
evolutions))
+                .isEqualTo(1);
+        assertThat(((DataSplit) plan2.splits().get(0)).minValue(field.id(), 
field, evolutions))
+                .isNull();
+        assertThat(((DataSplit) plan2.splits().get(0)).maxValue(field.id(), 
field, evolutions))
+                .isNull();
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index 2ed0d5c9b3..612b4ed49b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -32,6 +32,7 @@ import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.CatalogEnvironment;
@@ -171,6 +172,19 @@ public abstract class ScannerTestBase {
                 fileIO, tablePath, tableSchema, conf, 
CatalogEnvironment.empty());
     }
 
+    protected void updateColumn(String columnName, DataType type) throws 
Exception {
+        TableSchema tableSchema =
+                table.schemaManager()
+                        
.commitChanges(SchemaChange.updateColumnType(columnName, type));
+        table =
+                FileStoreTableFactory.create(
+                        table.fileIO(),
+                        table.location(),
+                        tableSchema,
+                        new Options(table.options()),
+                        CatalogEnvironment.empty());
+    }
+
     protected List<Split> toSplits(List<DataSplit> dataSplits) {
         return new ArrayList<>(dataSplits);
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index ce1fe2e25f..1e3bb846cf 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -73,7 +73,13 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
     }
     pushDownLimit.foreach(_readBuilder.withLimit)
     pushDownTopN.foreach(_readBuilder.withTopN)
-    _readBuilder.dropStats()
+
+    // when TopN is not empty, we need the stats to pick the TopN DataSplits
+    if (pushDownTopN.nonEmpty) {
+      _readBuilder
+    } else {
+      _readBuilder.dropStats()
+    }
   }
 
   final def metadataColumns: Seq[PaimonMetadataColumn] = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
index c6abec1acd..fd1ee59b5a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.aggregate
 
+import org.apache.paimon.stats.StatsUtils.minmaxAvailable
 import org.apache.paimon.table.Table
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types._
@@ -58,21 +59,12 @@ object AggregatePushDownUtils {
 
       val dataField = getDataFieldForCol(columnName)
 
-      dataField.`type`() match {
-        // not push down complex type
-        // not push down Timestamp because INT96 sort order is undefined,
-        // Parquet doesn't return statistics for INT96
-        // not push down Parquet Binary because min/max could be truncated
-        // (https://issues.apache.org/jira/browse/PARQUET-1685), Parquet Binary
-        // could be Spark StringType, BinaryType or DecimalType.
-        // not push down for ORC with same reason.
-        case _: BooleanType | _: TinyIntType | _: SmallIntType | _: IntType | 
_: BigIntType |
-            _: FloatType | _: DoubleType | _: DateType =>
-          minmaxColumns.add(columnName)
-          hasMinMax = true
-          true
-        case _ =>
-          false
+      if (minmaxAvailable(dataField.`type`())) {
+        minmaxColumns.add(columnName)
+        hasMinMax = true
+        true
+      } else {
+        false
       }
     }
 
@@ -95,15 +87,7 @@ object AggregatePushDownUtils {
     }
 
     if (hasMinMax) {
-      dataSplits.forall {
-        dataSplit =>
-          dataSplit.dataFiles().asScala.forall {
-            dataFile =>
-              // It means there are all column statistics when valueStatsCols 
== null
-              dataFile.valueStatsCols() == null ||
-              minmaxColumns.forall(dataFile.valueStatsCols().contains)
-          }
-      }
+      dataSplits.forall(_.statsAvailable(minmaxColumns.toSet.asJava))
     } else if (hasCount) {
       dataSplits.forall(_.mergedRowCountAvailable())
     } else {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index 1edc64e55b..27c8c00846 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -285,57 +285,68 @@ abstract class PaimonPushDownTestBase extends 
PaimonSparkTestBase {
   test("Paimon pushDown: topN for append-only tables") {
     assume(gteqSpark3_3)
     spark.sql("""
-                |CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED 
BY (pt)
+                |CREATE TABLE T (pt INT, id INT, price BIGINT) PARTITIONED BY 
(pt)
                 |TBLPROPERTIES ('file-index.range-bitmap.columns'='id')
                 |""".stripMargin)
     Assertions.assertTrue(getScanBuilder().isInstanceOf[SupportsPushDownTopN])
 
     spark.sql("""
                 |INSERT INTO T VALUES
-                |(1, "a1", "2023"),
-                |(1, "a1", "2023"),
-                |(3, "d1", "2023"),
-                |(4, "e1", "2023")
+                |(1, 10, 100L),
+                |(2, 20, 200L),
+                |(3, 30, 300L)
                 |""".stripMargin)
     spark.sql("""
                 |INSERT INTO T VALUES
-                |(5, "a2", "2025"),
-                |(NULL, "b2", "2025"),
-                |(6, "c2", "2025"),
-                |(7, "d2", "2025"),
-                |(8, "e2", "2025")
+                |(4, 40, 400L),
+                |(5, 50, 500L)
                 |""".stripMargin)
     spark.sql("""
                 |INSERT INTO T VALUES
-                |(5, "a3", "2023"),
-                |(9, "a3", "2023"),
-                |(2, "c3", "2025"),
-                |(NULL, "b2", "2025")
+                |(6, NULL, 600L),
+                |(6, NULL, 600L),
+                |(6, 60, 600L),
+                |(7, NULL, 700L),
+                |(7, NULL, 700L),
+                |(7, 70, 700L)
+                |""".stripMargin)
+
+    // disable stats
+    spark.sql("""
+                |ALTER TABLE T SET TBLPROPERTIES ('metadata.stats-mode' = 
'none')
+                |""".stripMargin)
+    spark.sql("""
+                |INSERT INTO T VALUES
+                |(8, 80, 800L),
+                |(9, 90, 900L)
+                |""".stripMargin)
+    spark.sql("""
+                |ALTER TABLE T UNSET TBLPROPERTIES ('metadata.stats-mode')
                 |""".stripMargin)
 
     // test ASC
     checkAnswer(
-      spark.sql("SELECT * FROM T ORDER BY id ASC NULLS LAST LIMIT 3"),
-      Row(1, "a1", "2023") :: Row(1, "a1", "2023") :: Row(2, "c3", "2025") :: 
Nil)
+      spark.sql("SELECT id FROM T ORDER BY id ASC NULLS LAST LIMIT 5"),
+      Row(10) :: Row(20) :: Row(30) :: Row(40) :: Row(50) :: Nil)
     checkAnswer(
-      spark.sql("SELECT * FROM T ORDER BY id ASC NULLS FIRST LIMIT 3"),
-      Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(1, "a1", 
"2023") :: Nil)
+      spark.sql("SELECT id FROM T ORDER BY id ASC NULLS FIRST LIMIT 5"),
+      Row(null) :: Row(null) :: Row(null) :: Row(null) :: Row(10) :: Nil)
 
     // test DESC
     checkAnswer(
-      spark.sql("SELECT * FROM T ORDER BY id DESC NULLS LAST LIMIT 3"),
-      Row(9, "a3", "2023") :: Row(8, "e2", "2025") :: Row(7, "d2", "2025") :: 
Nil)
+      spark.sql("SELECT id FROM T ORDER BY id DESC NULLS LAST LIMIT 5"),
+      Row(90) :: Row(80) :: Row(70) :: Row(60) :: Row(50) :: Nil)
     checkAnswer(
-      spark.sql("SELECT * FROM T ORDER BY id DESC NULLS FIRST LIMIT 3"),
-      Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(9, "a3", 
"2023") :: Nil)
+      spark.sql("SELECT id FROM T ORDER BY id DESC NULLS FIRST LIMIT 5"),
+      Row(null) :: Row(null) :: Row(null) :: Row(null) :: Row(90) :: Nil)
 
     // test with partition
     checkAnswer(
-      spark.sql("SELECT * FROM T WHERE pt='2023' ORDER BY id DESC LIMIT 3"),
-      Row(9, "a3", "2023") :: Row(5, "a3", "2023") :: Row(4, "e1", "2023") :: 
Nil)
+      spark.sql("SELECT id FROM T WHERE pt=6 ORDER BY id DESC LIMIT 5"),
+      Row(60) :: Row(null) :: Row(null) :: Nil)
     checkAnswer(
-      spark.sql("SELECT * FROM T WHERE pt='2025' ORDER BY id ASC LIMIT 3"),
-      Row(null, "b2", "2025") :: Row(null, "b2", "2025") :: Row(2, "c3", 
"2025") :: Nil)
+      spark.sql("SELECT id FROM T WHERE pt=6 ORDER BY id ASC LIMIT 3"),
+      Row(null) :: Row(null) :: Row(60) :: Nil)
 
     // test plan
     val df1 = spark.sql("SELECT * FROM T ORDER BY id DESC LIMIT 1")

Reply via email to