This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f5647255f4 [spark] Split pushedFilters into dataFilters and 
partitionFilters in ScanBuilder (#6817)
f5647255f4 is described below

commit f5647255f426e634a24aa0da24d59a7e36aec0f0
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Dec 16 15:09:14 2025 +0800

    [spark] Split pushedFilters into dataFilters and partitionFilters in 
ScanBuilder (#6817)
---
 .../apache/paimon/predicate/PredicateBuilder.java  |  15 --
 .../org/apache/paimon/predicate/SortValue.java     |  17 ++
 .../java/org/apache/paimon/predicate/TopN.java     |  15 ++
 .../paimon/operation/ManifestFileMerger.java       |   4 +-
 .../paimon/partition/PartitionPredicate.java       | 211 ++++++++++++++++++---
 .../PartitionValuesTimeExpireStrategy.java         |  16 ++
 .../paimon/table/format/FormatReadBuilder.java     |  17 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |  22 +--
 .../paimon/table/format/FormatTableScanTest.java   |   2 +-
 .../paimon/spark/FormatTableScanBuilder.scala      |  20 +-
 ...ePushDown.scala => PaimonBaseScanBuilder.scala} |  59 ++++--
 .../paimon/spark/PaimonFormatTableScan.scala       |   8 +-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |  12 +-
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  13 +-
 .../paimon/spark/PaimonSplitScanBuilder.scala      |  29 ---
 .../scala/org/apache/paimon/spark/PaimonScan.scala |  11 +-
 .../paimon/spark/ColumnPruningAndPushDown.scala    |  47 ++++-
 .../paimon/spark/FormatTableScanBuilder.scala      |  27 +--
 .../apache/paimon/spark/PaimonBasePushDown.scala   |  91 ---------
 .../org/apache/paimon/spark/PaimonBaseScan.scala   |  48 +----
 .../paimon/spark/PaimonBaseScanBuilder.scala       |  96 ++++++++--
 .../org/apache/paimon/spark/PaimonBatch.scala      |   1 -
 .../paimon/spark/PaimonFormatTableBaseScan.scala   |  17 +-
 .../paimon/spark/PaimonFormatTableScan.scala       |   8 +-
 .../org/apache/paimon/spark/PaimonLocalScan.scala  |  10 +-
 .../scala/org/apache/paimon/spark/PaimonScan.scala |  25 +--
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  43 ++---
 .../org/apache/paimon/spark/PaimonSplitScan.scala  |  24 ++-
 .../org/apache/paimon/spark/PaimonStatistics.scala |  73 +++----
 .../paimon/spark/sql/AnalyzeTableTestBase.scala    |  24 +--
 .../spark/sql/PaimonOptimizationTestBase.scala     |   5 +
 31 files changed, 537 insertions(+), 473 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
index 935ecad303..c8050efa16 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateBuilder.java
@@ -297,21 +297,6 @@ public class PredicateBuilder {
         return result;
     }
 
-    public static Pair<List<Predicate>, List<Predicate>> splitAndByPartition(
-            Predicate predicate, int[] fieldIdxToPartitionIdx) {
-        List<Predicate> partitionFilters = new ArrayList<>();
-        List<Predicate> nonPartitionFilters = new ArrayList<>();
-        for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
-            Optional<Predicate> mapped = transformFieldMapping(p, 
fieldIdxToPartitionIdx);
-            if (mapped.isPresent()) {
-                partitionFilters.add(mapped.get());
-            } else {
-                nonPartitionFilters.add(p);
-            }
-        }
-        return Pair.of(partitionFilters, nonPartitionFilters);
-    }
-
     public static List<Predicate> splitOr(@Nullable Predicate predicate) {
         if (predicate == null) {
             return Collections.emptyList();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
index 61d7913510..c78819b1c6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/SortValue.java
@@ -21,6 +21,7 @@ package org.apache.paimon.predicate;
 import org.apache.paimon.utils.Preconditions;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 /** Represents a sort order. */
 public class SortValue implements Serializable {
@@ -55,6 +56,22 @@ public class SortValue implements Serializable {
                 "%s %s %s", field.name(), direction.toString(), 
nullOrdering.toString());
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SortValue sortValue = (SortValue) o;
+        return Objects.equals(field, sortValue.field)
+                && direction == sortValue.direction
+                && nullOrdering == sortValue.nullOrdering;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(field, direction, nullOrdering);
+    }
+
     /** A null order used in sorting expressions. */
     public enum NullOrdering {
         NULLS_FIRST("NULLS FIRST"),
diff --git a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
index 70b50c0c3a..20d34b86d2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/TopN.java
@@ -24,6 +24,7 @@ import org.apache.paimon.predicate.SortValue.SortDirection;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.ListUtils.isNullOrEmpty;
@@ -62,4 +63,18 @@ public class TopN implements Serializable {
         String sort = 
orders.stream().map(SortValue::toString).collect(Collectors.joining(", "));
         return String.format("Sort(%s), Limit(%s)", sort, limit);
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TopN topN = (TopN) o;
+        return limit == topN.limit && Objects.equals(orders, topN.orders);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(orders, limit);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
index 51c7081916..fd8321de04 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestFileMerger.java
@@ -200,13 +200,13 @@ public class ManifestFileMerger {
 
         PartitionPredicate predicate;
         if (deleteEntries.isEmpty()) {
-            predicate = PartitionPredicate.alwaysFalse();
+            predicate = PartitionPredicate.ALWAYS_FALSE;
         } else {
             if (partitionType.getFieldCount() > 0) {
                 Set<BinaryRow> deletePartitions = 
computeDeletePartitions(deleteEntries);
                 predicate = PartitionPredicate.fromMultiple(partitionType, 
deletePartitions);
             } else {
-                predicate = PartitionPredicate.alwaysTrue();
+                predicate = PartitionPredicate.ALWAYS_TRUE;
             }
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index cb4e952664..d997ad2db7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -30,6 +30,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.statistics.FullSimpleColStatsCollector;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
@@ -37,13 +38,18 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 
+import static 
org.apache.paimon.predicate.PredicateBuilder.fieldIdxToPartitionIdx;
+import static 
org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -102,41 +108,53 @@ public interface PartitionPredicate extends Serializable {
                 new RowDataToObjectArrayConverter(partitionType), partitions);
     }
 
-    static PartitionPredicate alwaysFalse() {
-        return new PartitionPredicate() {
-            @Override
-            public boolean test(BinaryRow part) {
-                return false;
-            }
+    /** Creates {@link PartitionPredicate} that combines multiple predicates 
using logical AND. */
+    @Nullable
+    static PartitionPredicate and(List<PartitionPredicate> predicates) {
+        if (predicates.isEmpty()) {
+            return null;
+        }
 
-            @Override
-            public boolean test(
-                    long rowCount,
-                    InternalRow minValues,
-                    InternalRow maxValues,
-                    InternalArray nullCounts) {
-                return false;
-            }
-        };
+        if (predicates.size() == 1) {
+            return predicates.get(0);
+        }
+
+        return new AndPartitionPredicate(predicates);
     }
 
-    static PartitionPredicate alwaysTrue() {
-        return new PartitionPredicate() {
-            @Override
-            public boolean test(BinaryRow part) {
-                return true;
-            }
+    PartitionPredicate ALWAYS_FALSE =
+            new PartitionPredicate() {
+                @Override
+                public boolean test(BinaryRow part) {
+                    return false;
+                }
 
-            @Override
-            public boolean test(
-                    long rowCount,
-                    InternalRow minValues,
-                    InternalRow maxValues,
-                    InternalArray nullCounts) {
-                return true;
-            }
-        };
-    }
+                @Override
+                public boolean test(
+                        long rowCount,
+                        InternalRow minValues,
+                        InternalRow maxValues,
+                        InternalArray nullCounts) {
+                    return false;
+                }
+            };
+
+    PartitionPredicate ALWAYS_TRUE =
+            new PartitionPredicate() {
+                @Override
+                public boolean test(BinaryRow part) {
+                    return true;
+                }
+
+                @Override
+                public boolean test(
+                        long rowCount,
+                        InternalRow minValues,
+                        InternalRow maxValues,
+                        InternalArray nullCounts) {
+                    return true;
+                }
+            };
 
     /** A {@link PartitionPredicate} using {@link Predicate}. */
     class DefaultPartitionPredicate implements PartitionPredicate {
@@ -166,6 +184,26 @@ public interface PartitionPredicate extends Serializable {
         public Predicate predicate() {
             return predicate;
         }
+
+        @Override
+        public String toString() {
+            return predicate.toString();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            DefaultPartitionPredicate that = (DefaultPartitionPredicate) o;
+            return Objects.equals(predicate, that.predicate);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(predicate);
+        }
     }
 
     /**
@@ -253,6 +291,82 @@ public interface PartitionPredicate extends Serializable {
         public Set<BinaryRow> partitions() {
             return partitions;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            MultiplePartitionPredicate that = (MultiplePartitionPredicate) o;
+            return fieldNum == that.fieldNum
+                    && Objects.equals(partitions, that.partitions)
+                    && Objects.deepEquals(min, that.min)
+                    && Objects.deepEquals(max, that.max);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(partitions, fieldNum, Arrays.hashCode(min), 
Arrays.hashCode(max));
+        }
+    }
+
+    /** AND-combines multiple {@link PartitionPredicate}s. */
+    class AndPartitionPredicate implements PartitionPredicate {
+
+        private static final long serialVersionUID = 1L;
+
+        private final List<PartitionPredicate> predicates;
+
+        private AndPartitionPredicate(List<PartitionPredicate> predicates) {
+            checkArgument(!predicates.isEmpty());
+            this.predicates = Collections.unmodifiableList(new 
ArrayList<>(predicates));
+        }
+
+        @Override
+        public boolean test(BinaryRow partition) {
+            for (PartitionPredicate predicate : predicates) {
+                if (!predicate.test(partition)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public boolean test(
+                long rowCount,
+                InternalRow minValues,
+                InternalRow maxValues,
+                InternalArray nullCounts) {
+            for (PartitionPredicate predicate : predicates) {
+                if (!predicate.test(rowCount, minValues, maxValues, 
nullCounts)) {
+                    return false;
+                }
+            }
+            return true;
+        }
+
+        @Override
+        public String toString() {
+            if (predicates.size() == 1) {
+                return predicates.get(0).toString();
+            }
+            return "AND" + "(" + predicates + ")";
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            AndPartitionPredicate that = (AndPartitionPredicate) o;
+            return Objects.equals(predicates, that.predicates);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(predicates);
+        }
     }
 
     static Predicate createPartitionPredicate(RowType rowType, Map<String, 
Object> partition) {
@@ -340,4 +454,39 @@ public interface PartitionPredicate extends Serializable {
         return fromMultiple(
                 partitionType, createBinaryPartitions(values, partitionType, 
defaultPartValue));
     }
+
+    static Pair<Optional<PartitionPredicate>, List<Predicate>>
+            splitPartitionPredicatesAndDataPredicates(
+                    Predicate dataPredicates, RowType tableType, List<String> 
partitionKeys) {
+        return splitPartitionPredicatesAndDataPredicates(
+                PredicateBuilder.splitAnd(dataPredicates), tableType, 
partitionKeys);
+    }
+
+    static Pair<Optional<PartitionPredicate>, List<Predicate>>
+            splitPartitionPredicatesAndDataPredicates(
+                    List<Predicate> dataPredicates, RowType tableType, 
List<String> partitionKeys) {
+        if (partitionKeys.isEmpty()) {
+            return Pair.of(Optional.empty(), dataPredicates);
+        }
+
+        RowType partitionType = tableType.project(partitionKeys);
+        int[] partitionIdx = fieldIdxToPartitionIdx(tableType, partitionKeys);
+
+        List<Predicate> partitionFilters = new ArrayList<>();
+        List<Predicate> nonPartitionFilters = new ArrayList<>();
+        for (Predicate p : dataPredicates) {
+            Optional<Predicate> mapped = transformFieldMapping(p, 
partitionIdx);
+            if (mapped.isPresent()) {
+                partitionFilters.add(mapped.get());
+            } else {
+                nonPartitionFilters.add(p);
+            }
+        }
+        PartitionPredicate partitionPredicate =
+                partitionFilters.isEmpty()
+                        ? null
+                        : PartitionPredicate.fromPredicate(
+                                partitionType, 
PredicateBuilder.and(partitionFilters));
+        return Pair.of(Optional.ofNullable(partitionPredicate), 
nonPartitionFilters);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
index 6685a1d28c..94e26d6f37 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
@@ -33,6 +33,7 @@ import java.time.LocalDateTime;
 import java.time.format.DateTimeParseException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -117,5 +118,20 @@ public class PartitionValuesTimeExpireStrategy extends 
PartitionExpireStrategy {
                 InternalArray nullCounts) {
             return true;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            PartitionValuesTimePredicate that = (PartitionValuesTimePredicate) 
o;
+            return Objects.equals(expireDateTime, that.expireDateTime);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(expireDateTime);
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 637fa070b5..ec344fb541 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -51,13 +51,13 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
+import static 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates;
 import static 
org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
-import static 
org.apache.paimon.predicate.PredicateBuilder.fieldIdxToPartitionIdx;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAndByPartition;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** {@link ReadBuilder} for {@link FormatTable}. */
@@ -147,13 +147,12 @@ public class FormatReadBuilder implements ReadBuilder {
     public TableScan newScan() {
         PartitionPredicate partitionFilter = this.partitionFilter;
         if (partitionFilter == null && this.filter != null && 
!table.partitionKeys().isEmpty()) {
-            int[] partitionIdx = fieldIdxToPartitionIdx(table.rowType(), 
table.partitionKeys());
-            List<Predicate> partitionFilters = splitAndByPartition(filter, 
partitionIdx).getLeft();
-            if (!partitionFilters.isEmpty()) {
-                RowType partitionType = 
table.rowType().project(table.partitionKeys());
-                partitionFilter =
-                        PartitionPredicate.fromPredicate(
-                                partitionType, 
PredicateBuilder.and(partitionFilters));
+            Optional<PartitionPredicate> partitionPredicateOpt =
+                    splitPartitionPredicatesAndDataPredicates(
+                                    filter, table.rowType(), 
table.partitionKeys())
+                            .getLeft();
+            if (partitionPredicateOpt.isPresent()) {
+                partitionFilter = partitionPredicateOpt.get();
             }
         }
         return new FormatTableScan(table, partitionFilter, limit);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 63261f8c64..15f662d3ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -70,6 +70,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -78,7 +79,7 @@ import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAndByPartition;
+import static 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates;
 
 /** Implementation of {@link SnapshotReader}. */
 public class SnapshotReaderImpl implements SnapshotReader {
@@ -228,19 +229,14 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
 
     @Override
     public SnapshotReader withFilter(Predicate predicate) {
-        int[] fieldIdxToPartitionIdx =
-                PredicateBuilder.fieldIdxToPartitionIdx(
-                        tableSchema.logicalRowType(), 
tableSchema.partitionKeys());
-        Pair<List<Predicate>, List<Predicate>> partitionAndNonPartitionFilter =
-                splitAndByPartition(predicate, fieldIdxToPartitionIdx);
-        List<Predicate> partitionFilters = 
partitionAndNonPartitionFilter.getLeft();
-        List<Predicate> nonPartitionFilters = 
partitionAndNonPartitionFilter.getRight();
-        if (partitionFilters.size() > 0) {
-            scan.withPartitionFilter(PredicateBuilder.and(partitionFilters));
+        Pair<Optional<PartitionPredicate>, List<Predicate>> pair =
+                splitPartitionPredicatesAndDataPredicates(
+                        predicate, tableSchema.logicalRowType(), 
tableSchema.partitionKeys());
+        if (pair.getLeft().isPresent()) {
+            scan.withPartitionFilter(pair.getLeft().get());
         }
-
-        if (nonPartitionFilters.size() > 0) {
-            nonPartitionFilterConsumer.accept(scan, 
PredicateBuilder.and(nonPartitionFilters));
+        if (!pair.getRight().isEmpty()) {
+            nonPartitionFilterConsumer.accept(scan, 
PredicateBuilder.and(pair.getRight()));
         }
         return this;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
index c2d45f1c62..1c9a3d6e7c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatTableScanTest.java
@@ -115,7 +115,7 @@ public class FormatTableScanTest {
     void testComputeScanPathAndLevelNoPartitionKeys() {
         List<String> partitionKeys = Collections.emptyList();
         RowType partitionType = RowType.of();
-        PartitionPredicate partitionFilter = PartitionPredicate.alwaysTrue();
+        PartitionPredicate partitionFilter = PartitionPredicate.ALWAYS_TRUE;
 
         Pair<Path, Integer> result =
                 FormatTableScan.computeScanPathAndLevel(
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
index 9be731924d..2ae94d65f8 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
@@ -19,25 +19,9 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.table.FormatTable
-import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.connector.read.{SupportsPushDownRequiredColumns, 
SupportsRuntimeFiltering}
-import org.apache.spark.sql.types.StructType
-
-import java.util.{List => JList}
-
-case class FormatTableScanBuilder(table: FormatTable)
-  extends PaimonBasePushDown
-  with SupportsPushDownRequiredColumns {
-
-  override protected var partitionKeys: JList[String] = table.partitionKeys()
-  override protected var rowType: RowType = table.rowType()
-  protected var requiredSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(rowType)
+case class FormatTableScanBuilder(table: FormatTable) extends 
PaimonBaseScanBuilder {
 
   override def build(): PaimonFormatTableScan =
-    PaimonFormatTableScan(table, requiredSchema, pushedPaimonPredicates, None)
-
-  override def pruneColumns(requiredSchema: StructType): Unit = {
-    this.requiredSchema = requiredSchema
-  }
+    PaimonFormatTableScan(table, requiredSchema, pushedPartitionFilters, 
pushedDataFilters)
 }
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
similarity index 51%
rename from 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
rename to 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index c99066343e..806ce90e28 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,60 +18,85 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.partition.PartitionPredicate
+import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
+import org.apache.paimon.table.Table
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.connector.read.SupportsPushDownFilters
+import org.apache.spark.sql.connector.read.{SupportsPushDownFilters, 
SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
 
 import java.util.{List => JList}
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-/** Base trait for Paimon scan filter push down. */
-trait PaimonBasePushDown extends SupportsPushDownFilters {
+/** Base Scan builder. */
+abstract class PaimonBaseScanBuilder
+  extends SupportsPushDownRequiredColumns
+  with SupportsPushDownFilters {
 
-  protected var partitionKeys: JList[String]
-  protected var rowType: RowType
+  val table: Table
+  val partitionKeys: JList[String] = table.partitionKeys()
+  val rowType: RowType = table.rowType()
 
   private var pushedSparkFilters = Array.empty[Filter]
-  protected var pushedPaimonPredicates: Array[Predicate] = Array.empty
-  protected var reservedFilters: Array[Filter] = Array.empty
   protected var hasPostScanPredicates = false
 
+  protected var pushedPartitionFilters: Array[PartitionPredicate] = Array.empty
+  protected var pushedDataFilters: Array[Predicate] = Array.empty
+
+  protected var requiredSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    this.requiredSchema = requiredSchema
+  }
+
   /**
    * Pushes down filters, and returns filters that need to be evaluated after 
scanning. <p> Rows
    * should be returned from the data source if and only if all the filters 
match. That is, filters
    * must be interpreted as ANDed together.
    */
   override def pushFilters(filters: Array[Filter]): Array[Filter] = {
-    val pushable = mutable.ArrayBuffer.empty[(Filter, Predicate)]
+    val pushable = mutable.ArrayBuffer.empty[Filter]
+    val pushablePartitionDataFilters = mutable.ArrayBuffer.empty[Predicate]
+    val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
     val postScan = mutable.ArrayBuffer.empty[Filter]
-    val reserved = mutable.ArrayBuffer.empty[Filter]
 
     val converter = new SparkFilterConverter(rowType)
-    val visitor = new PartitionPredicateVisitor(partitionKeys)
+    val partitionPredicateVisitor = new 
PartitionPredicateVisitor(partitionKeys)
     filters.foreach {
       filter =>
         val predicate = converter.convertIgnoreFailure(filter)
         if (predicate == null) {
           postScan.append(filter)
         } else {
-          pushable.append((filter, predicate))
-          if (predicate.visit(visitor)) {
-            reserved.append(filter)
+          pushable.append(filter)
+          if (predicate.visit(partitionPredicateVisitor)) {
+            pushablePartitionDataFilters.append(predicate)
           } else {
+            pushableDataFilters.append(predicate)
             postScan.append(filter)
           }
         }
     }
 
     if (pushable.nonEmpty) {
-      this.pushedSparkFilters = pushable.map(_._1).toArray
-      this.pushedPaimonPredicates = pushable.map(_._2).toArray
+      this.pushedSparkFilters = pushable.toArray
+    }
+    if (pushablePartitionDataFilters.nonEmpty) {
+      val pair = splitPartitionPredicatesAndDataPredicates(
+        pushablePartitionDataFilters.asJava,
+        rowType,
+        partitionKeys)
+      assert(pair.getRight.isEmpty)
+      assert(pair.getLeft.isPresent)
+      this.pushedPartitionFilters = Array(pair.getLeft.get())
     }
-    if (reserved.nonEmpty) {
-      this.reservedFilters = reserved.toArray
+    if (pushableDataFilters.nonEmpty) {
+      this.pushedDataFilters = pushableDataFilters.toArray
     }
     if (postScan.nonEmpty) {
       this.hasPostScanPredicates = true
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index a7bbe4e94e..e9734d238b 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.table.FormatTable
 
@@ -33,9 +34,10 @@ import scala.collection.JavaConverters._
 case class PaimonFormatTableScan(
     table: FormatTable,
     requiredSchema: StructType,
-    filters: Seq[Predicate],
-    override val pushDownLimit: Option[Int])
-  extends PaimonFormatTableBaseScan(table, requiredSchema, filters, 
pushDownLimit)
+    pushedPartitionFilters: Seq[PartitionPredicate],
+    pushedDataFilters: Seq[Predicate],
+    override val pushedLimit: Option[Int] = None)
+  extends PaimonFormatTableBaseScan
   with SupportsRuntimeFiltering
   with ScanHelper {
 
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index e8fe9a9c40..68b89d5ecc 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.table.InnerTable
 
@@ -32,13 +33,12 @@ import scala.collection.JavaConverters._
 case class PaimonScan(
     table: InnerTable,
     requiredSchema: StructType,
-    filters: Seq[Predicate],
-    reservedFilters: Seq[Filter],
-    override val pushDownLimit: Option[Int],
-    // no usage, just for compile compatibility
-    override val pushDownTopN: Option[TopN],
+    pushedPartitionFilters: Seq[PartitionPredicate],
+    pushedDataFilters: Seq[Predicate],
+    override val pushedLimit: Option[Int] = None,
+    override val pushedTopN: Option[TopN] = None,
     bucketedScanDisabled: Boolean = true)
-  extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
+  extends PaimonBaseScan(table)
   with SupportsRuntimeFiltering {
 
   override def filterAttributes(): Array[NamedReference] = {
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 0ba6dca6c0..21ab46dabc 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -19,21 +19,12 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.table.InnerTable
-import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.sources.Filter
 
-import java.util.{List => JList}
-
-class PaimonScanBuilder(table: InnerTable)
-  extends PaimonBaseScanBuilder(table)
-  with PaimonBasePushDown {
-
-  override protected var partitionKeys: JList[String] = table.partitionKeys()
-  override protected var rowType: RowType = table.rowType()
+class PaimonScanBuilder(val table: InnerTable) extends PaimonBaseScanBuilder {
 
   override def build(): Scan = {
-    PaimonScan(table, requiredSchema, pushedPaimonPredicates, reservedFilters, 
None, pushDownTopN)
+    PaimonScan(table, requiredSchema, pushedPartitionFilters, 
pushedDataFilters)
   }
 }
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonSplitScanBuilder.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonSplitScanBuilder.scala
deleted file mode 100644
index ede18b8cc9..0000000000
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonSplitScanBuilder.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.table.KnownSplitsTable
-
-import org.apache.spark.sql.connector.read.Scan
-
-class PaimonSplitScanBuilder(table: KnownSplitsTable) extends 
PaimonScanBuilder(table) {
-  override def build(): Scan = {
-    PaimonSplitScan(table, table.splits(), requiredSchema, 
pushedPaimonPredicates)
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 0fb515ac2c..7c0a4d0c17 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
@@ -34,12 +35,12 @@ import scala.collection.JavaConverters._
 case class PaimonScan(
     table: InnerTable,
     requiredSchema: StructType,
-    filters: Seq[Predicate],
-    reservedFilters: Seq[Filter],
-    override val pushDownLimit: Option[Int],
-    override val pushDownTopN: Option[TopN],
+    pushedPartitionFilters: Seq[PartitionPredicate],
+    pushedDataFilters: Seq[Predicate],
+    override val pushedLimit: Option[Int],
+    override val pushedTopN: Option[TopN],
     bucketedScanDisabled: Boolean = false)
-  extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
+  extends PaimonBaseScan(table)
   with SupportsRuntimeFiltering
   with SupportsReportPartitioning {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index 11f4d8ad4b..98daf2eaef 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -19,7 +19,8 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder, TopN}
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
 import org.apache.paimon.table.{SpecialFields, Table}
@@ -30,12 +31,20 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.types.StructType
 
+import scala.collection.JavaConverters._
+
 trait ColumnPruningAndPushDown extends Scan with Logging {
+
   def table: Table
+
+  // Column pruning
   def requiredSchema: StructType
-  def filters: Seq[Predicate]
-  def pushDownLimit: Option[Int] = None
-  def pushDownTopN: Option[TopN] = None
+
+  // Push down
+  def pushedPartitionFilters: Seq[PartitionPredicate]
+  def pushedDataFilters: Seq[Predicate]
+  def pushedLimit: Option[Int] = None
+  def pushedTopN: Option[TopN] = None
 
   val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
 
@@ -82,12 +91,14 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
 
   lazy val readBuilder: ReadBuilder = {
     val _readBuilder = table.newReadBuilder().withReadType(readTableRowType)
-    if (filters.nonEmpty) {
-      val pushedPredicate = PredicateBuilder.and(filters: _*)
-      _readBuilder.withFilter(pushedPredicate)
+    if (pushedPartitionFilters.nonEmpty) {
+      
_readBuilder.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
+    }
+    if (pushedDataFilters.nonEmpty) {
+      _readBuilder.withFilter(pushedDataFilters.asJava)
     }
-    pushDownLimit.foreach(_readBuilder.withLimit)
-    pushDownTopN.foreach(_readBuilder.withTopN)
+    pushedLimit.foreach(_readBuilder.withLimit)
+    pushedTopN.foreach(_readBuilder.withTopN)
     _readBuilder.dropStats()
   }
 
@@ -104,4 +115,22 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
     }
     _readSchema
   }
+
+  override def description(): String = {
+    val pushedPartitionFiltersStr = if (pushedPartitionFilters.nonEmpty) {
+      ", PartitionFilters: [" + pushedPartitionFilters.mkString(",") + "]"
+    } else {
+      ""
+    }
+    val pushedDataFiltersStr = if (pushedDataFilters.nonEmpty) {
+      ", DataFilters: [" + pushedDataFilters.mkString(",") + "]"
+    } else {
+      ""
+    }
+    s"${getClass.getSimpleName}: [${table.name}]" +
+      pushedPartitionFiltersStr +
+      pushedDataFiltersStr +
+      pushedTopN.map(topN => s", TopN: [$topN]").getOrElse("") +
+      pushedLimit.map(limit => s", Limit: [$limit]").getOrElse("")
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
index df9921d563..b92476a085 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableScanBuilder.scala
@@ -19,27 +19,14 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.table.FormatTable
-import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.connector.read.{ScanBuilder, 
SupportsPushDownRequiredColumns}
-import org.apache.spark.sql.types.StructType
-
-import java.util.{List => JList}
-
-case class FormatTableScanBuilder(table: FormatTable)
-  extends ScanBuilder
-  with PaimonBasePushDown
-  with SupportsPushDownRequiredColumns {
-
-  override protected var partitionKeys: JList[String] = table.partitionKeys()
-  override protected var rowType: RowType = table.rowType()
-
-  protected var requiredSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(rowType)
+case class FormatTableScanBuilder(table: FormatTable) extends 
PaimonBaseScanBuilder {
 
   override def build(): PaimonFormatTableScan =
-    PaimonFormatTableScan(table, requiredSchema, pushedPaimonPredicates, 
pushDownLimit)
-
-  override def pruneColumns(requiredSchema: StructType): Unit = {
-    this.requiredSchema = requiredSchema
-  }
+    PaimonFormatTableScan(
+      table,
+      requiredSchema,
+      pushedPartitionFilters,
+      pushedDataFilters,
+      pushedLimit)
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
deleted file mode 100644
index 3f10202607..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBasePushDown.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
-import org.apache.paimon.types.RowType
-
-import org.apache.spark.sql.PaimonUtils
-import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
-import org.apache.spark.sql.connector.read.{SupportsPushDownLimit, 
SupportsPushDownV2Filters}
-import org.apache.spark.sql.sources.Filter
-
-import java.util.{List => JList}
-
-import scala.collection.mutable
-
-/** Base trait for Paimon scan push down. */
-trait PaimonBasePushDown extends SupportsPushDownV2Filters with 
SupportsPushDownLimit {
-
-  protected var partitionKeys: JList[String]
-  protected var rowType: RowType
-
-  private var pushedSparkPredicates = Array.empty[SparkPredicate]
-  protected var pushedPaimonPredicates: Array[Predicate] = Array.empty
-  protected var reservedFilters: Array[Filter] = Array.empty
-  protected var hasPostScanPredicates = false
-  protected var pushDownLimit: Option[Int] = None
-
-  override def pushPredicates(predicates: Array[SparkPredicate]): 
Array[SparkPredicate] = {
-    val pushable = mutable.ArrayBuffer.empty[(SparkPredicate, Predicate)]
-    val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
-    val reserved = mutable.ArrayBuffer.empty[Filter]
-
-    val converter = SparkV2FilterConverter(rowType)
-    val visitor = new PartitionPredicateVisitor(partitionKeys)
-    predicates.foreach {
-      predicate =>
-        converter.convert(predicate) match {
-          case Some(paimonPredicate) =>
-            pushable.append((predicate, paimonPredicate))
-            if (paimonPredicate.visit(visitor)) {
-              // We need to filter the stats using filter instead of predicate.
-              PaimonUtils.filterV2ToV1(predicate).map(reserved.append(_))
-            } else {
-              postScan.append(predicate)
-            }
-          case None =>
-            postScan.append(predicate)
-        }
-    }
-
-    if (pushable.nonEmpty) {
-      this.pushedSparkPredicates = pushable.map(_._1).toArray
-      this.pushedPaimonPredicates = pushable.map(_._2).toArray
-    }
-    if (reserved.nonEmpty) {
-      this.reservedFilters = reserved.toArray
-    }
-    if (postScan.nonEmpty) {
-      this.hasPostScanPredicates = true
-    }
-    postScan.toArray
-  }
-
-  override def pushedPredicates: Array[SparkPredicate] = {
-    pushedSparkPredicates
-  }
-
-  override def pushLimit(limit: Int): Boolean = {
-    // It is safe, since we will do nothing if it is the primary table and the 
split is not `rawConvertible`
-    pushDownLimit = Some(limit)
-    // just make the best effort to push down limit
-    false
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 7ae11b7ce3..f39f209415 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -19,10 +19,8 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.annotation.VisibleForTesting
-import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.spark.sources.PaimonMicroBatchStream
-import org.apache.paimon.spark.statistics.StatisticsHelper
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.stats
 import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
@@ -31,24 +29,17 @@ import org.apache.paimon.table.source.{InnerTableScan, 
Split}
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, 
SupportsReportStatistics}
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
-import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 import java.util.Optional
 
 import scala.collection.JavaConverters._
 
-abstract class PaimonBaseScan(
-    table: InnerTable,
-    requiredSchema: StructType,
-    filters: Seq[Predicate],
-    reservedFilters: Seq[Filter],
-    pushDownLimit: Option[Int])
+abstract class PaimonBaseScan(table: InnerTable)
   extends Scan
   with SupportsReportStatistics
   with ScanHelper
-  with ColumnPruningAndPushDown
-  with StatisticsHelper {
+  with ColumnPruningAndPushDown {
 
   protected var inputPartitions: Seq[PaimonInputPartition] = _
 
@@ -59,8 +50,7 @@ abstract class PaimonBaseScan(
   private lazy val paimonMetricsRegistry: SparkMetricRegistry = 
SparkMetricRegistry()
 
   lazy val requiredStatsSchema: StructType = {
-    val fieldNames =
-      readTableRowType.getFields.asScala.map(_.name) ++ 
reservedFilters.flatMap(_.references)
+    val fieldNames = readTableRowType.getFields.asScala.map(_.name)
     StructType(tableSchema.filter(field => fieldNames.contains(field.name)))
   }
 
@@ -96,13 +86,7 @@ abstract class PaimonBaseScan(
   }
 
   override def estimateStatistics(): Statistics = {
-    val stats = PaimonStatistics(this)
-    // When using paimon stats, we need to perform additional FilterEstimation 
with reservedFilters on stats.
-    if (stats.paimonStatsEnabled && reservedFilters.nonEmpty) {
-      filterStatistics(stats, reservedFilters)
-    } else {
-      stats
-    }
+    PaimonStatistics(this)
   }
 
   override def supportedCustomMetrics: Array[CustomMetric] = {
@@ -137,28 +121,4 @@ abstract class PaimonBaseScan(
       case _ =>
     }
   }
-
-  override def description(): String = {
-    val pushedFiltersStr = if (filters.nonEmpty) {
-      ", PushedFilters: [" + filters.mkString(",") + "]"
-    } else {
-      ""
-    }
-
-    val reservedFiltersStr = if (reservedFilters.nonEmpty) {
-      ", ReservedFilters: [" + reservedFilters.mkString(",") + "]"
-    } else {
-      ""
-    }
-
-    val pushedTopNFilterStr = if (pushDownTopN.nonEmpty) {
-      s", PushedTopNFilter: [${pushDownTopN.get.toString}]"
-    } else {
-      ""
-    }
-
-    s"PaimonScan: [${table.name}]" +
-      pushedFiltersStr + reservedFiltersStr + pushedTopNFilterStr +
-      pushDownLimit.map(limit => s", Limit: [$limit]").getOrElse("")
-  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index cacaf651fc..71ab544b71 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -18,24 +18,98 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.{Predicate, TopN}
-import org.apache.paimon.table.InnerTable
+import org.apache.paimon.partition.PartitionPredicate
+import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
+import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN}
+import org.apache.paimon.table.Table
+import org.apache.paimon.types.RowType
 
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownRequiredColumns}
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
+import org.apache.spark.sql.connector.read.{SupportsPushDownLimit, 
SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
 import org.apache.spark.sql.types.StructType
 
-abstract class PaimonBaseScanBuilder(table: InnerTable)
-  extends ScanBuilder
-  with SupportsPushDownRequiredColumns
-  with Logging {
+import java.util.{List => JList}
 
-  protected var requiredSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/** Base Scan builder. */
+abstract class PaimonBaseScanBuilder
+  extends SupportsPushDownRequiredColumns
+  with SupportsPushDownV2Filters
+  with SupportsPushDownLimit {
+
+  val table: Table
+  val partitionKeys: JList[String] = table.partitionKeys()
+  val rowType: RowType = table.rowType()
+
+  private var pushedSparkPredicates = Array.empty[SparkPredicate]
+  protected var hasPostScanPredicates = false
 
-  protected var pushDownTopN: Option[TopN] = None
+  protected var pushedPartitionFilters: Array[PartitionPredicate] = Array.empty
+  protected var pushedDataFilters: Array[Predicate] = Array.empty
+  protected var pushedLimit: Option[Int] = None
+  protected var pushedTopN: Option[TopN] = None
+
+  protected var requiredSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
     this.requiredSchema = requiredSchema
   }
+
+  override def pushPredicates(predicates: Array[SparkPredicate]): 
Array[SparkPredicate] = {
+    val pushable = mutable.ArrayBuffer.empty[SparkPredicate]
+    val pushablePartitionDataFilters = mutable.ArrayBuffer.empty[Predicate]
+    val pushableDataFilters = mutable.ArrayBuffer.empty[Predicate]
+    val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
+
+    val converter = SparkV2FilterConverter(rowType)
+    val partitionPredicateVisitor = new 
PartitionPredicateVisitor(partitionKeys)
+    predicates.foreach {
+      predicate =>
+        converter.convert(predicate) match {
+          case Some(paimonPredicate) =>
+            pushable.append(predicate)
+            if (paimonPredicate.visit(partitionPredicateVisitor)) {
+              pushablePartitionDataFilters.append(paimonPredicate)
+            } else {
+              pushableDataFilters.append(paimonPredicate)
+              postScan.append(predicate)
+            }
+          case None =>
+            postScan.append(predicate)
+        }
+    }
+
+    if (pushable.nonEmpty) {
+      this.pushedSparkPredicates = pushable.toArray
+    }
+    if (pushablePartitionDataFilters.nonEmpty) {
+      val pair = splitPartitionPredicatesAndDataPredicates(
+        pushablePartitionDataFilters.asJava,
+        rowType,
+        partitionKeys)
+      assert(pair.getRight.isEmpty)
+      assert(pair.getLeft.isPresent)
+      this.pushedPartitionFilters = Array(pair.getLeft.get())
+    }
+    if (pushableDataFilters.nonEmpty) {
+      this.pushedDataFilters = pushableDataFilters.toArray
+    }
+    if (postScan.nonEmpty) {
+      this.hasPostScanPredicates = true
+    }
+    postScan.toArray
+  }
+
+  override def pushedPredicates: Array[SparkPredicate] = {
+    pushedSparkPredicates
+  }
+
+  override def pushLimit(limit: Int): Boolean = {
+    // It is safe, since we will do nothing if it is the primary table and the 
split is not `rawConvertible`
+    pushedLimit = Some(limit)
+    // just make the best effort to push down limit
+    false
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
index d9137a1250..1e8f415f30 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala
@@ -38,5 +38,4 @@ case class PaimonBatch(
 
   override def createReaderFactory(): PartitionReaderFactory =
     PaimonPartitionReaderFactory(readBuilder, metadataColumns, 
blobAsDescriptor)
-
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index a0dd474436..793e15540a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -18,22 +18,16 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.table.FormatTable
 import org.apache.paimon.table.source.Split
 
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.read.{Batch, Statistics, 
SupportsReportStatistics}
-import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
 
 /** Base Scan implementation for [[FormatTable]]. */
-abstract class PaimonFormatTableBaseScan(
-    table: FormatTable,
-    requiredSchema: StructType,
-    filters: Seq[Predicate],
-    pushDownLimit: Option[Int])
+abstract class PaimonFormatTableBaseScan
   extends ColumnPruningAndPushDown
   with SupportsReportStatistics
   with ScanHelper {
@@ -82,13 +76,4 @@ abstract class PaimonFormatTableBaseScan(
       PaimonResultedTableFilesTaskMetric(filesCount)
     )
   }
-
-  override def description(): String = {
-    val pushedFiltersStr = if (filters.nonEmpty) {
-      ", PushedFilters: [" + filters.mkString(",") + "]"
-    } else {
-      ""
-    }
-    s"PaimonFormatTableScan: [${table.name}]" + pushedFiltersStr
-  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index 8fb64999b9..aac086c2a7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.table.FormatTable
 
@@ -27,6 +28,7 @@ import org.apache.spark.sql.types.StructType
 case class PaimonFormatTableScan(
     table: FormatTable,
     requiredSchema: StructType,
-    filters: Seq[Predicate],
-    override val pushDownLimit: Option[Int])
-  extends PaimonFormatTableBaseScan(table, requiredSchema, filters, 
pushDownLimit) {}
+    pushedPartitionFilters: Seq[PartitionPredicate],
+    pushedDataFilters: Seq[Predicate],
+    override val pushedLimit: Option[Int])
+  extends PaimonFormatTableBaseScan {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
index 1f4e88e8d1..b4f8b3b785 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.catalyst.InternalRow
@@ -30,15 +30,15 @@ case class PaimonLocalScan(
     rows: Array[InternalRow],
     readSchema: StructType,
     table: Table,
-    filters: Array[Predicate])
+    pushedPartitionFilters: Seq[PartitionPredicate])
   extends LocalScan {
 
   override def description(): String = {
-    val pushedFiltersStr = if (filters.nonEmpty) {
-      ", PushedFilters: [" + filters.mkString(",") + "]"
+    val pushedPartitionFiltersStr = if (pushedPartitionFilters.nonEmpty) {
+      ", PartitionFilters: [" + pushedPartitionFilters.mkString(",") + "]"
     } else {
       ""
     }
-    s"PaimonLocalScan: [${table.name}]" + pushedFiltersStr
+    s"PaimonLocalScan: [${table.name}]" + pushedPartitionFiltersStr
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 08f0029989..f8a6e89df8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions.BucketFunctionType
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.spark.commands.BucketExpression.quote
 import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
@@ -29,7 +30,6 @@ import org.apache.spark.sql.connector.expressions._
 import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
 import org.apache.spark.sql.connector.read.{SupportsReportOrdering, 
SupportsReportPartitioning, SupportsRuntimeV2Filtering}
 import 
org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, 
Partitioning, UnknownPartitioning}
-import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
@@ -37,19 +37,12 @@ import scala.collection.JavaConverters._
 case class PaimonScan(
     table: InnerTable,
     requiredSchema: StructType,
-    filters: Seq[Predicate],
-    reservedFilters: Seq[Filter],
-    override val pushDownLimit: Option[Int],
-    override val pushDownTopN: Option[TopN],
+    pushedPartitionFilters: Seq[PartitionPredicate],
+    pushedDataFilters: Seq[Predicate],
+    override val pushedLimit: Option[Int],
+    override val pushedTopN: Option[TopN],
     bucketedScanDisabled: Boolean = false)
-  extends PaimonScanCommon(
-    table,
-    requiredSchema,
-    filters,
-    reservedFilters,
-    pushDownLimit,
-    pushDownTopN,
-    bucketedScanDisabled)
+  extends PaimonScanCommon(table, requiredSchema, bucketedScanDisabled)
   with SupportsRuntimeV2Filtering {
   def disableBucketedScan(): PaimonScan = {
     copy(bucketedScanDisabled = true)
@@ -87,12 +80,8 @@ case class PaimonScan(
 abstract class PaimonScanCommon(
     table: InnerTable,
     requiredSchema: StructType,
-    filters: Seq[Predicate],
-    reservedFilters: Seq[Filter],
-    override val pushDownLimit: Option[Int],
-    override val pushDownTopN: Option[TopN],
     bucketedScanDisabled: Boolean = false)
-  extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
+  extends PaimonBaseScan(table)
   with SupportsReportPartitioning
   with SupportsReportOrdering {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 186cb116f5..c61ce42b80 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -18,33 +18,26 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate._
 import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
 import 
org.apache.paimon.spark.aggregate.AggregatePushDownUtils.tryPushdownAggregation
 import org.apache.paimon.table.{FileStoreTable, InnerTable}
-import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.connector.expressions
 import org.apache.spark.sql.connector.expressions.{NamedReference, SortOrder}
 import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
 import org.apache.spark.sql.connector.read._
 
-import java.util.{List => JList}
-
 import scala.collection.JavaConverters._
 
-class PaimonScanBuilder(table: InnerTable)
-  extends PaimonBaseScanBuilder(table)
-  with PaimonBasePushDown
+class PaimonScanBuilder(val table: InnerTable)
+  extends PaimonBaseScanBuilder
   with SupportsPushDownAggregates
   with SupportsPushDownTopN {
 
   private var localScan: Option[Scan] = None
 
-  override protected var partitionKeys: JList[String] = table.partitionKeys()
-  override protected var rowType: RowType = table.rowType()
-
   override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
     if (hasPostScanPredicates) {
       return false
@@ -86,7 +79,7 @@ class PaimonScanBuilder(table: InnerTable)
         })
       .toList
 
-    pushDownTopN = Some(new TopN(sorts.asJava, limit))
+    pushedTopN = Some(new TopN(sorts.asJava, limit))
 
     // just make the best effort to push down TopN
     false
@@ -115,15 +108,15 @@ class PaimonScanBuilder(table: InnerTable)
     }
 
     val readBuilder = table.newReadBuilder
-    if (pushedPaimonPredicates.nonEmpty) {
-      val pushedPartitionPredicate = 
PredicateBuilder.and(pushedPaimonPredicates.toList.asJava)
-      readBuilder.withFilter(pushedPartitionPredicate)
+    if (pushedPartitionFilters.nonEmpty) {
+      
readBuilder.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.toList.asJava))
     }
+    assert(pushedDataFilters.isEmpty)
 
     tryPushdownAggregation(table.asInstanceOf[FileStoreTable], aggregation, 
readBuilder) match {
       case Some(agg) =>
         localScan = Some(
-          PaimonLocalScan(agg.result(), agg.resultSchema(), table, 
pushedPaimonPredicates)
+          PaimonLocalScan(agg.result(), agg.resultSchema(), table, 
pushedPartitionFilters)
         )
         true
       case None => false
@@ -131,16 +124,16 @@ class PaimonScanBuilder(table: InnerTable)
   }
 
   override def build(): Scan = {
-    if (localScan.isDefined) {
-      localScan.get
-    } else {
-      PaimonScan(
-        table,
-        requiredSchema,
-        pushedPaimonPredicates,
-        reservedFilters,
-        pushDownLimit,
-        pushDownTopN)
+    localScan match {
+      case Some(scan) => scan
+      case None =>
+        PaimonScan(
+          table,
+          requiredSchema,
+          pushedPartitionFilters,
+          pushedDataFilters,
+          pushedLimit,
+          pushedTopN)
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 2fc8d9d5f8..f47081e5b2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.CoreOptions
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
@@ -27,9 +27,15 @@ import org.apache.spark.sql.connector.metric.{CustomMetric, 
CustomTaskMetric}
 import org.apache.spark.sql.connector.read.{Batch, Scan}
 import org.apache.spark.sql.types.StructType
 
-class PaimonSplitScanBuilder(table: KnownSplitsTable) extends 
PaimonScanBuilder(table) {
+class PaimonSplitScanBuilder(val table: KnownSplitsTable) extends 
PaimonBaseScanBuilder {
+
   override def build(): Scan = {
-    PaimonSplitScan(table, table.splits(), requiredSchema, 
pushedPaimonPredicates)
+    PaimonSplitScan(
+      table,
+      table.splits(),
+      requiredSchema,
+      pushedPartitionFilters,
+      pushedDataFilters)
   }
 }
 
@@ -38,7 +44,8 @@ case class PaimonSplitScan(
     table: InnerTable,
     dataSplits: Array[DataSplit],
     requiredSchema: StructType,
-    filters: Seq[Predicate])
+    pushedPartitionFilters: Seq[PartitionPredicate],
+    pushedDataFilters: Seq[Predicate])
   extends ColumnPruningAndPushDown
   with ScanHelper {
 
@@ -64,13 +71,4 @@ case class PaimonSplitScan(
       PaimonResultedTableFilesTaskMetric(filesCount)
     )
   }
-
-  override def description(): String = {
-    val pushedFiltersStr = if (filters.nonEmpty) {
-      ", PushedFilters: [" + filters.mkString(",") + "]"
-    } else {
-      ""
-    }
-    s"PaimonSplitScan: [${table.name}]" + pushedFiltersStr
-  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
index 8dd4649330..cd8dcd1165 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
@@ -18,9 +18,9 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.spark.data.SparkInternalRow
+import org.apache.paimon.stats
 import org.apache.paimon.stats.ColStats
-import org.apache.paimon.types.{DataField, DataType, RowType}
+import org.apache.paimon.types.{DataField, DataType}
 
 import org.apache.spark.sql.PaimonUtils
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
@@ -34,62 +34,45 @@ import scala.collection.JavaConverters._
 
 case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics {
 
-  private lazy val rowCount: Long = 
scan.lazyInputPartitions.map(_.rowCount()).sum
+  import PaimonImplicits._
 
-  private lazy val scannedTotalSize: Long = rowCount * 
scan.readSchema().defaultSize
+  private lazy val paimonStats: Option[stats.Statistics] = scan.statistics
 
-  private lazy val paimonStats = if (scan.statistics.isPresent) 
scan.statistics.get() else null
+  private lazy val rowCount: Long = 
scan.lazyInputPartitions.map(_.rowCount()).sum
 
-  lazy val paimonStatsEnabled: Boolean = {
-    paimonStats != null &&
-    paimonStats.mergedRecordSize().isPresent &&
-    paimonStats.mergedRecordCount().isPresent
+  private lazy val scannedTotalSize: Long = {
+    val readSchemaSize =
+      
SparkTypeUtils.toPaimonRowType(scan.readSchema()).getFields.asScala.map(getSizeForField).sum
+    val sizeInBytes = rowCount * readSchemaSize
+    // Avoid return 0 bytes if there are some valid rows.
+    // Avoid return too small size in bytes which may less than row count,
+    // note the compression ratio on disk is usually bigger than memory.
+    Math.max(sizeInBytes, rowCount)
   }
 
   private def getSizeForField(field: DataField): Long = {
-    Option(paimonStats.colStats().get(field.name()))
-      .map(_.avgLen())
-      .filter(_.isPresent)
-      .map(_.getAsLong)
-      .getOrElse(field.`type`().defaultSize().toLong)
-  }
-
-  private def getSizeForRow(schema: RowType): Long = {
-    schema.getFields.asScala.map(field => getSizeForField(field)).sum
-  }
-
-  override def sizeInBytes(): OptionalLong = {
-    if (!paimonStatsEnabled) {
-      return OptionalLong.of(scannedTotalSize)
+    paimonStats match {
+      case Some(stats) =>
+        val colStat = stats.colStats().get(field.name())
+        if (colStat != null && colStat.avgLen().isPresent) {
+          colStat.avgLen().getAsLong
+        } else {
+          field.`type`().defaultSize().toLong
+        }
+      case _ =>
+        field.`type`().defaultSize().toLong
     }
-
-    val wholeSchemaSize = getSizeForRow(scan.tableRowType)
-
-    val requiredDataSchemaSize =
-      scan.readTableRowType.getFields.asScala.map(field => 
getSizeForField(field)).sum
-    val requiredDataSizeInBytes =
-      paimonStats.mergedRecordSize().getAsLong * 
(requiredDataSchemaSize.toDouble / wholeSchemaSize)
-
-    val metadataSchemaSize =
-      scan.metadataColumns.map(field => 
getSizeForField(field.toPaimonDataField)).sum
-    val metadataSizeInBytes = paimonStats.mergedRecordCount().getAsLong * 
metadataSchemaSize
-
-    val sizeInBytes = (requiredDataSizeInBytes + metadataSizeInBytes).toLong
-    // Avoid return 0 bytes if there are some valid rows.
-    // Avoid return too small size in bytes which may less than row count,
-    // note the compression ratio on disk is usually bigger than memory.
-    val normalized = Math.max(sizeInBytes, 
paimonStats.mergedRecordCount().getAsLong)
-    OptionalLong.of(normalized)
   }
 
-  override def numRows(): OptionalLong =
-    if (paimonStatsEnabled) paimonStats.mergedRecordCount() else 
OptionalLong.of(rowCount)
+  override def numRows(): OptionalLong = OptionalLong.of(rowCount)
+
+  override def sizeInBytes(): OptionalLong = OptionalLong.of(scannedTotalSize)
 
   override def columnStats(): java.util.Map[NamedReference, ColumnStatistics] 
= {
     val requiredFields = scan.requiredStatsSchema.fieldNames
     val resultMap = new java.util.HashMap[NamedReference, ColumnStatistics]()
-    if (paimonStatsEnabled) {
-      val paimonColStats = paimonStats.colStats()
+    if (paimonStats.isDefined) {
+      val paimonColStats = paimonStats.get.colStats()
       scan.tableRowType.getFields.asScala
         .filter {
           field => requiredFields.contains(field.name) && 
paimonColStats.containsKey(field.name())
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
index 040c90170f..c6860779cd 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
@@ -439,21 +439,17 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
     // paimon will reserve partition filter and not return it to spark, we 
need to ensure stats are filtered correctly.
     // partition push down hit
     var sql = "SELECT * FROM T WHERE pt < 1"
-    Assertions.assertEquals(
-      if (gteqSpark3_4) 0L else 4L,
-      getScanStatistic(sql).rowCount.get.longValue)
+    Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue)
     checkAnswer(spark.sql(sql), Nil)
 
     // partition push down hit and select without it
     sql = "SELECT id FROM T WHERE pt < 1"
-    Assertions.assertEquals(
-      if (gteqSpark3_4) 0L else 4L,
-      getScanStatistic(sql).rowCount.get.longValue)
+    Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue)
     checkAnswer(spark.sql(sql), Nil)
 
     // partition push down not hit
     sql = "SELECT * FROM T WHERE id < 1"
-    Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue)
+    Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue)
     checkAnswer(spark.sql(sql), Nil)
   }
 
@@ -470,12 +466,17 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
           sql("INSERT INTO T VALUES (1, 'a', '1'), (2, 'b', '1'), (3, 'c', 
'2'), (4, 'd', '3')")
           sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
 
-          // For col type such as char, varchar that don't have min and max, 
filter estimation on stats has no effect.
           var sqlText = "SELECT * FROM T WHERE pt < '1'"
-          Assertions.assertEquals(4L, 
getScanStatistic(sqlText).rowCount.get.longValue)
+          Assertions.assertEquals(
+            if (gteqSpark3_4 && partitionType == "char(10)") 4L else 0L,
+            getScanStatistic(sqlText).rowCount.get.longValue)
+          checkAnswer(sql(sqlText), Nil)
 
           sqlText = "SELECT id FROM T WHERE pt < '1'"
-          Assertions.assertEquals(4L, 
getScanStatistic(sqlText).rowCount.get.longValue)
+          Assertions.assertEquals(
+            if (gteqSpark3_4 && partitionType == "char(10)") 4L else 0L,
+            getScanStatistic(sqlText).rowCount.get.longValue)
+          checkAnswer(sql(sqlText), Nil)
         }
       })
   }
@@ -525,7 +526,7 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
     spark.sql("ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
     val withColStat = checkStatistics()
 
-    assert(withColStat == noColStat)
+    assert(withColStat != noColStat)
   }
 
   test("Query a non-existent catalog") {
@@ -553,5 +554,4 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
       .get
     relation.computeStats()
   }
-
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index e55a610a02..d417b5f405 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -102,8 +102,13 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase with Expre
       spark.sql(s"CREATE TABLE T (id INT, name STRING, pt STRING) PARTITIONED 
BY (pt)")
       spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1'), (3, 
'c', 'p2')")
 
+      // data filter and partition filter
       val sqlText = "SELECT * FROM T WHERE id = 1 AND pt = 'p1' LIMIT 1"
       Assertions.assertEquals(getPaimonScan(sqlText), getPaimonScan(sqlText))
+
+      // topN
+      val sqlText2 = "SELECT id FROM T ORDER BY id ASC NULLS LAST LIMIT 5"
+      Assertions.assertEquals(getPaimonScan(sqlText2), getPaimonScan(sqlText2))
     }
   }
 

Reply via email to