This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fc86abec18 [core] Refactor row id pushdown to 
DataEvolutionFileStoreScan
fc86abec18 is described below

commit fc86abec181a036c2a867daf6306adffb5d67808
Author: JingsongLi <[email protected]>
AuthorDate: Sat Dec 27 20:34:54 2025 +0800

    [core] Refactor row id pushdown to DataEvolutionFileStoreScan
---
 .../shortcodes/generated/core_configuration.html   |  6 ---
 .../main/java/org/apache/paimon/CoreOptions.java   | 12 -----
 .../paimon/predicate/RowIdPredicateVisitor.java    | 47 +++++++++--------
 .../main/java/org/apache/paimon/utils/Range.java   | 40 +++++++--------
 .../apache/paimon/utils/RoaringNavigableMap64.java | 33 +-----------
 .../paimon/globalindex/DataEvolutionBatchScan.java | 42 ++++++++++++++-
 .../paimon/operation/AbstractFileStoreScan.java    |  6 +--
 .../operation/DataEvolutionFileStoreScan.java      | 31 ------------
 .../apache/paimon/operation/ManifestsReader.java   | 50 ++++++++++++++----
 .../org/apache/paimon/schema/SchemaValidation.java |  6 ---
 .../paimon/table/source/ReadBuilderImpl.java       | 54 +-------------------
 .../paimon/spark/PaimonBaseScanBuilder.scala       | 15 +++---
 .../paimon/spark/PaimonBaseScanBuilder.scala       | 14 ++---
 .../paimon/spark/sql/RowIdPushDownTestBase.scala   | 59 +++++++++-------------
 14 files changed, 162 insertions(+), 253 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 36950aa916..fc7a0c3839 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1037,12 +1037,6 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>String</td>
             <td>Time field for record level expire. It supports the following 
types: `timestamps in seconds with INT`,`timestamps in seconds with BIGINT`, 
`timestamps in milliseconds with BIGINT` or `timestamp`.</td>
         </tr>
-        <tr>
-            <td><h5>row-id-push-down.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to enable row id push down for scan. Currently, only 
the data evolution table supports row id push down.</td>
-        </tr>
         <tr>
             <td><h5>row-tracking.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 324608988d..20aa5182d6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2146,14 +2146,6 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether to try upgrading the data files after 
overwriting a primary key table.");
 
-    public static final ConfigOption<Boolean> ROW_ID_PUSH_DOWN_ENABLED =
-            key("row-id-push-down.enabled")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Whether to enable row id push down for scan."
-                                    + " Currently, only the data evolution 
table supports row id push down.");
-
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -3338,10 +3330,6 @@ public class CoreOptions implements Serializable {
         return options.get(OVERWRITE_UPGRADE);
     }
 
-    public boolean rowIdPushDownEnabled() {
-        return options.get(ROW_ID_PUSH_DOWN_ENABLED);
-    }
-
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
index d8ae6f6a6d..953737dd9f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/RowIdPredicateVisitor.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.Range;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.paimon.table.SpecialFields.ROW_ID;
 
@@ -40,10 +41,10 @@ import static org.apache.paimon.table.SpecialFields.ROW_ID;
  *       AND _ROW_ID IN (1, 2)}).
  * </ul>
  */
-public class RowIdPredicateVisitor implements PredicateVisitor<List<Range>> {
+public class RowIdPredicateVisitor implements 
PredicateVisitor<Optional<List<Range>>> {
 
     @Override
-    public List<Range> visit(LeafPredicate predicate) {
+    public Optional<List<Range>> visit(LeafPredicate predicate) {
         if (ROW_ID.name().equals(predicate.fieldName())) {
             LeafFunction function = predicate.function();
             if (function instanceof Equal || function instanceof In) {
@@ -53,57 +54,55 @@ public class RowIdPredicateVisitor implements 
PredicateVisitor<List<Range>> {
                 }
                 // The list output by getRangesFromList is already sorted,
                 // and has no overlap
-                return Range.getRangesFromList(rowIds);
+                return Optional.of(Range.toRanges(rowIds));
             }
         }
-        return null;
+        return Optional.empty();
     }
 
     @Override
-    public List<Range> visit(CompoundPredicate predicate) {
+    public Optional<List<Range>> visit(CompoundPredicate predicate) {
         CompoundPredicate.Function function = predicate.function();
-        List<Range> rowIds = null;
+        Optional<List<Range>> rowIds = Optional.empty();
         // `And` means we should get the intersection of all children.
         if (function instanceof And) {
             for (Predicate child : predicate.children()) {
-                List<Range> childList = child.visit(this);
-                if (childList == null) {
+                Optional<List<Range>> childList = child.visit(this);
+                if (!childList.isPresent()) {
                     continue;
                 }
 
-                if (rowIds == null) {
-                    rowIds = childList;
-                } else {
-                    rowIds = Range.and(rowIds, childList);
-                }
+                rowIds =
+                        rowIds.map(ranges -> Optional.of(Range.and(ranges, 
childList.get())))
+                                .orElse(childList);
 
                 // shortcut for intersection
-                if (rowIds.isEmpty()) {
+                if (rowIds.get().isEmpty()) {
                     return rowIds;
                 }
             }
         } else if (function instanceof Or) {
             // `Or` means we should get the union of all children
-            rowIds = new ArrayList<>();
+            rowIds = Optional.of(new ArrayList<>());
             for (Predicate child : predicate.children()) {
-                List<Range> childList = child.visit(this);
-                if (childList == null) {
-                    return null;
+                Optional<List<Range>> childList = child.visit(this);
+                if (!childList.isPresent()) {
+                    return Optional.empty();
                 }
 
-                rowIds.addAll(childList);
-                rowIds = Range.sortAndMergeOverlap(rowIds, true);
+                rowIds.get().addAll(childList.get());
+                rowIds = Optional.of(Range.sortAndMergeOverlap(rowIds.get(), 
true));
             }
         } else {
-            // unexpected function type, just return null
-            return null;
+            // unexpected function type, just return empty
+            return Optional.empty();
         }
         return rowIds;
     }
 
     @Override
-    public List<Range> visit(TransformPredicate predicate) {
+    public Optional<List<Range>> visit(TransformPredicate predicate) {
         // do not support transform predicate now.
-        return null;
+        return Optional.empty();
     }
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 7d5afd025f..1cd80bc56b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -22,9 +22,9 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 /** Range represents from (inclusive) and to (inclusive). */
 public class Range implements Serializable {
@@ -178,31 +178,29 @@ public class Range implements Serializable {
         return result;
     }
 
-    public static List<Range> getRangesFromList(List<Long> origLongs) {
-        if (origLongs == null || origLongs.isEmpty()) {
-            return Collections.emptyList();
+    public static List<Range> toRanges(Iterable<Long> ids) {
+        List<Range> ranges = new ArrayList<>();
+        Iterator<Long> iterator = ids.iterator();
+
+        if (!iterator.hasNext()) {
+            return ranges;
         }
 
-        List<Long> longs = 
origLongs.stream().distinct().sorted().collect(Collectors.toList());
-
-        ArrayList<Range> ranges = new ArrayList<>();
-        Long rangeStart = null;
-        Long rangeEnd = null;
-        for (Long cur : longs) {
-            if (rangeStart == null) {
-                rangeStart = cur;
-                rangeEnd = cur;
-            } else if (rangeEnd == cur - 1) {
-                rangeEnd = cur;
-            } else {
+        long rangeStart = iterator.next();
+        long rangeEnd = rangeStart;
+
+        while (iterator.hasNext()) {
+            long current = iterator.next();
+            if (current != rangeEnd + 1) {
+                // Save the current range and start a new one
                 ranges.add(new Range(rangeStart, rangeEnd));
-                rangeStart = cur;
-                rangeEnd = cur;
+                rangeStart = current;
             }
+            rangeEnd = current;
         }
-        if (rangeStart != null) {
-            ranges.add(new Range(rangeStart, rangeEnd));
-        }
+        // Add the last range
+        ranges.add(new Range(rangeStart, rangeEnd));
+
         return ranges;
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
index f07b7c6afa..bec44f3fb0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.utils;
 
-import org.apache.paimon.annotation.VisibleForTesting;
-
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import java.io.ByteArrayInputStream;
@@ -27,7 +25,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
@@ -135,42 +132,16 @@ public class RoaringNavigableMap64 implements 
Iterable<Long> {
         }
     }
 
-    @VisibleForTesting
     /**
      * Converts this bitmap to a list of contiguous ranges.
      *
      * <p>This is useful for interoperability with APIs that expect 
List&lt;Range&gt;.
      */
     public List<Range> toRangeList() {
-        List<Range> ranges = new ArrayList<>();
-        Iterator<Long> iterator = roaring64NavigableMap.iterator();
-
-        if (!iterator.hasNext()) {
-            return ranges;
-        }
-
-        long rangeStart = iterator.next();
-        long rangeEnd = rangeStart;
-
-        while (iterator.hasNext()) {
-            long current = iterator.next();
-            if (current == rangeEnd + 1) {
-                // Extend the current range
-                rangeEnd = current;
-            } else {
-                // Save the current range and start a new one
-                ranges.add(new Range(rangeStart, rangeEnd));
-                rangeStart = current;
-                rangeEnd = current;
-            }
-        }
-        // Add the last range
-        ranges.add(new Range(rangeStart, rangeEnd));
-
-        return ranges;
+        // TODO Optimize this to avoid iterator all ids
+        return Range.toRanges(roaring64NavigableMap::iterator);
     }
 
-    @VisibleForTesting
     public static RoaringNavigableMap64 bitmapOf(long... dat) {
         RoaringNavigableMap64 roaringBitmap64 = new RoaringNavigableMap64();
         for (long ele : dat) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 6d036d6cb0..586fd61b00 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -24,7 +24,10 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.RowIdPredicateVisitor;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.predicate.VectorSearch;
 import org.apache.paimon.table.FileStoreTable;
@@ -47,6 +50,7 @@ import java.util.Objects;
 import java.util.Optional;
 
 import static 
org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
+import static org.apache.paimon.table.SpecialFields.ROW_ID;
 
 /** Scan for data evolution table. */
 public class DataEvolutionBatchScan implements DataTableScan {
@@ -71,11 +75,43 @@ public class DataEvolutionBatchScan implements 
DataTableScan {
 
     @Override
     public InnerTableScan withFilter(Predicate predicate) {
+        if (predicate == null) {
+            return this;
+        }
+
+        predicate.visit(new 
RowIdPredicateVisitor()).ifPresent(this::withRowRanges);
+        predicate = removeRowIdFilter(predicate);
         this.filter = predicate;
         batchScan.withFilter(predicate);
         return this;
     }
 
+    private Predicate removeRowIdFilter(Predicate filter) {
+        if (filter instanceof LeafPredicate
+                && ROW_ID.name().equals(((LeafPredicate) filter).fieldName())) 
{
+            return null;
+        } else if (filter instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) filter;
+
+            List<Predicate> newChildren = new ArrayList<>();
+            for (Predicate child : compoundPredicate.children()) {
+                Predicate newChild = removeRowIdFilter(child);
+                if (newChild != null) {
+                    newChildren.add(newChild);
+                }
+            }
+
+            if (newChildren.isEmpty()) {
+                return null;
+            } else if (newChildren.size() == 1) {
+                return newChildren.get(0);
+            } else {
+                return new CompoundPredicate(compoundPredicate.function(), 
newChildren);
+            }
+        }
+        return filter;
+    }
+
     @Override
     public InnerTableScan withVectorSearch(VectorSearch vectorSearch) {
         this.vectorSearch = vectorSearch;
@@ -157,9 +193,13 @@ public class DataEvolutionBatchScan implements 
DataTableScan {
 
     @Override
     public InnerTableScan withRowRanges(List<Range> rowRanges) {
+        if (rowRanges == null) {
+            return this;
+        }
+
         this.pushedRowRanges = rowRanges;
         if (globalIndexResult != null) {
-            throw new IllegalStateException("");
+            throw new IllegalStateException("Cannot push row ranges after 
global index eval.");
         }
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index d261b050b6..284a2ef195 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -243,6 +243,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     @Override
     public FileStoreScan withRowRanges(List<Range> rowRanges) {
         this.rowRanges = rowRanges;
+        manifestsReader.withRowRanges(rowRanges);
         return this;
     }
 
@@ -274,7 +275,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         ManifestsReader.Result manifestsResult = readManifests();
         Snapshot snapshot = manifestsResult.snapshot;
         List<ManifestFileMeta> manifests = manifestsResult.filteredManifests;
-        manifests = postFilterManifests(manifests);
 
         Iterator<ManifestEntry> iterator = readManifestEntries(manifests, 
false);
         if (supportsLimitPushManifestEntries()) {
@@ -434,10 +434,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     /** Note: Keep this thread-safe. */
     protected abstract boolean filterByStats(ManifestEntry entry);
 
-    protected List<ManifestFileMeta> 
postFilterManifests(List<ManifestFileMeta> manifests) {
-        return manifests;
-    }
-
     protected boolean postFilterManifestEntriesEnabled() {
         return false;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 8f31cb7774..c2695ce682 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.DataEvolutionArray;
 import org.apache.paimon.reader.DataEvolutionRow;
@@ -103,36 +102,6 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
         return this;
     }
 
-    @Override
-    protected List<ManifestFileMeta> 
postFilterManifests(List<ManifestFileMeta> manifests) {
-        if (rowRanges == null || rowRanges.isEmpty()) {
-            return manifests;
-        }
-        return 
manifests.stream().filter(this::filterManifestByRowIds).collect(Collectors.toList());
-    }
-
-    private boolean filterManifestByRowIds(ManifestFileMeta manifest) {
-        if (rowRanges == null || rowRanges.isEmpty()) {
-            return true;
-        }
-
-        Long min = manifest.minRowId();
-        Long max = manifest.maxRowId();
-        if (min == null || max == null) {
-            return true;
-        }
-
-        Range manifestRowRange = new Range(min, max);
-
-        for (Range expected : rowRanges) {
-            if (Range.intersection(manifestRowRange, expected) != null) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
     @Override
     public FileStoreScan withReadType(RowType readType) {
         if (readType != null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
index b3b89e72aa..fed68d6997 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ManifestsReader.java
@@ -28,6 +28,7 @@ import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
@@ -54,6 +55,7 @@ public class ManifestsReader {
     @Nullable private Integer specifiedLevel = null;
     @Nullable private PartitionPredicate partitionFilter = null;
     @Nullable private BiFilter<Integer, Integer> levelMinMaxFilter = null;
+    @Nullable protected List<Range> rowRanges;
 
     public ManifestsReader(
             RowType partitionType,
@@ -106,6 +108,11 @@ public class ManifestsReader {
         return this;
     }
 
+    public ManifestsReader withRowRanges(List<Range> rowRanges) {
+        this.rowRanges = rowRanges;
+        return this;
+    }
+
     @Nullable
     public PartitionPredicate partitionFilter() {
         return partitionFilter;
@@ -146,6 +153,26 @@ public class ManifestsReader {
         }
     }
 
+    private boolean filterManifestByRowRanges(ManifestFileMeta manifest) {
+        if (rowRanges == null) {
+            return true;
+        }
+        Long min = manifest.minRowId();
+        Long max = manifest.maxRowId();
+        if (min == null || max == null) {
+            return true;
+        }
+
+        Range manifestRowRange = new Range(min, max);
+
+        for (Range expected : rowRanges) {
+            if (Range.intersection(manifestRowRange, expected) != null) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /** Note: Keep this thread-safe. */
     private boolean filterManifestFileMeta(ManifestFileMeta manifest) {
         Integer minBucket = manifest.minBucket();
@@ -172,17 +199,22 @@ public class ManifestsReader {
             }
         }
 
-        if (partitionFilter == null) {
-            return true;
+        if (partitionFilter != null) {
+            SimpleStats stats = manifest.partitionStats();
+            if (!partitionFilter.test(
+                    manifest.numAddedFiles() + manifest.numDeletedFiles(),
+                    stats.minValues(),
+                    stats.maxValues(),
+                    stats.nullCounts())) {
+                return false;
+            }
+        }
+
+        if (!filterManifestByRowRanges(manifest)) {
+            return false;
         }
 
-        SimpleStats stats = manifest.partitionStats();
-        return partitionFilter == null
-                || partitionFilter.test(
-                        manifest.numAddedFiles() + manifest.numDeletedFiles(),
-                        stats.minValues(),
-                        stats.maxValues(),
-                        stats.nullCounts());
+        return true;
     }
 
     /** Result for reading manifest files. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index d78bba4e8f..abbc2e4912 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -653,12 +653,6 @@ public class SchemaValidation {
                     "Data evolution config must disabled with 
deletion-vectors.enabled");
         }
 
-        if (options.rowIdPushDownEnabled()) {
-            checkArgument(
-                    options.dataEvolutionEnabled(),
-                    "Row id push down config must enabled with 
data-evolution.enabled");
-        }
-
         List<String> blobNames =
                 
BlobType.splitBlob(schema.logicalRowType()).getRight().getFieldNames();
         if (!blobNames.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 48765b7c50..c81dfd8e01 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -19,15 +19,11 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.data.variant.VariantAccessInfoUtils;
 import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.predicate.RowIdPredicateVisitor;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.predicate.VectorSearch;
 import org.apache.paimon.table.InnerTable;
@@ -37,14 +33,12 @@ import org.apache.paimon.utils.Range;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
-import static org.apache.paimon.table.SpecialFields.ROW_ID;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** Implementation for {@link ReadBuilder}. */
@@ -71,7 +65,7 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     private @Nullable RowType readType;
     private @Nullable VariantAccessInfo[] variantAccessInfo;
-    public @Nullable @VisibleForTesting List<Range> rowRanges;
+    private @Nullable List<Range> rowRanges;
     private @Nullable VectorSearch vectorSearch;
 
     private boolean dropStats = false;
@@ -105,55 +99,9 @@ public class ReadBuilderImpl implements ReadBuilder {
         } else {
             this.filter = PredicateBuilder.and(this.filter, filter);
         }
-        calculateRowRanges(this.filter);
-        this.filter = removeRowIdFilter(this.filter);
         return this;
     }
 
-    private void calculateRowRanges(Predicate filter) {
-        if (filter == null) {
-            return;
-        }
-
-        RowIdPredicateVisitor visitor = new RowIdPredicateVisitor();
-        List<Range> ranges = filter.visit(visitor);
-        // When rowRanges is not null, filter data based on rowRanges.
-        // If rowRanges is empty, it means no data will be read.
-        if (ranges != null) {
-            withRowRanges(ranges);
-        }
-    }
-
-    private Predicate removeRowIdFilter(Predicate filter) {
-        if (filter == null) {
-            return null;
-        }
-
-        if (filter instanceof LeafPredicate
-                && ROW_ID.name().equals(((LeafPredicate) filter).fieldName())) 
{
-            return null;
-        } else if (filter instanceof CompoundPredicate) {
-            CompoundPredicate compoundPredicate = (CompoundPredicate) filter;
-
-            List<Predicate> newChildren = new ArrayList<>();
-            for (Predicate child : compoundPredicate.children()) {
-                Predicate newChild = removeRowIdFilter(child);
-                if (newChild != null) {
-                    newChildren.add(newChild);
-                }
-            }
-
-            if (newChildren.isEmpty()) {
-                return null;
-            } else if (newChildren.size() == 1) {
-                return newChildren.get(0);
-            } else {
-                return new CompoundPredicate(compoundPredicate.function(), 
newChildren);
-            }
-        }
-        return filter;
-    }
-
     @Override
     public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
         if (partitionSpec != null) {
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 06b826b72a..41a1d552f1 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -22,15 +22,15 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
 import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
-import org.apache.paimon.table.{InnerTable, Table}
-import org.apache.paimon.table.SpecialFields.ROW_ID
-import org.apache.paimon.types.{DataField, DataTypes, RowType}
+import org.apache.paimon.table.SpecialFields.rowTypeWithRowTracking
+import org.apache.paimon.table.Table
+import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.connector.read.{SupportsPushDownFilters, 
SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
-import java.util.{ArrayList, List => JList}
+import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -69,11 +69,8 @@ abstract class PaimonBaseScanBuilder
     val postScan = mutable.ArrayBuffer.empty[Filter]
 
     var newRowType = rowType
-    if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
-      val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
-      dataFieldsWithRowId.add(
-        new DataField(rowType.getFieldCount, ROW_ID.name(), 
DataTypes.BIGINT()))
-      newRowType = rowType.copy(dataFieldsWithRowId)
+    if (coreOptions.rowTrackingEnabled() && 
coreOptions.dataEvolutionEnabled()) {
+      newRowType = rowTypeWithRowTracking(newRowType);
     }
     val converter = new SparkFilterConverter(newRowType)
     val partitionPredicateVisitor = new 
PartitionPredicateVisitor(partitionKeys)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 4ac5dd051b..8179f504b3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -22,15 +22,14 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.partition.PartitionPredicate
 import 
org.apache.paimon.partition.PartitionPredicate.splitPartitionPredicatesAndDataPredicates
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, TopN}
-import org.apache.paimon.table.{InnerTable, Table}
-import org.apache.paimon.table.SpecialFields.ROW_ID
-import org.apache.paimon.types.{DataField, DataTypes, RowType}
+import org.apache.paimon.table.{SpecialFields, Table}
+import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
 import org.apache.spark.sql.connector.read.{SupportsPushDownLimit, 
SupportsPushDownRequiredColumns, SupportsPushDownV2Filters}
 import org.apache.spark.sql.types.StructType
 
-import java.util.{ArrayList, List => JList}
+import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -67,11 +66,8 @@ abstract class PaimonBaseScanBuilder
     val postScan = mutable.ArrayBuffer.empty[SparkPredicate]
 
     var newRowType = rowType
-    if (table.isInstanceOf[InnerTable] && coreOptions.rowIdPushDownEnabled()) {
-      val dataFieldsWithRowId = new ArrayList[DataField](rowType.getFields)
-      dataFieldsWithRowId.add(
-        new DataField(rowType.getFieldCount, ROW_ID.name(), 
DataTypes.BIGINT()))
-      newRowType = rowType.copy(dataFieldsWithRowId)
+    if (coreOptions.rowTrackingEnabled() && 
coreOptions.dataEvolutionEnabled()) {
+      newRowType = SpecialFields.rowTypeWithRowTracking(newRowType);
     }
     val converter = SparkV2FilterConverter(newRowType)
     val partitionPredicateVisitor = new 
PartitionPredicateVisitor(partitionKeys)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
index 1e9f2d11b5..bb1d3e8717 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowIdPushDownTestBase.scala
@@ -32,61 +32,48 @@ class RowIdPushDownTestBase extends PaimonSparkTestBase {
     withTable("t") {
       sql("CREATE TABLE t (a INT, b INT, c STRING) TBLPROPERTIES " +
         "('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 
'row-id-push-down.enabled'='true')")
-      sql("INSERT INTO t VALUES (1, 1, '1'), (2, 2, '2'), (3, 3, '3'), (4, 4, 
'4')")
+
+      // first manifest
+      sql("INSERT INTO t VALUES (0, 0, '0'), (1, 1, '1'), (2, 2, '2'), (3, 3, 
'3')")
+
+      // second manifest
+      sql("INSERT INTO t VALUES (4, 4, '4'), (5, 5, '5')")
+
+      // delete second manifest
+      // after push down, should never read it
+      val table = loadTable("t")
+      val manifests =
+        
table.store().manifestListFactory().create().readAllManifests(table.latestSnapshot().get)
+      val secondManifest = manifests.asScala.find(m => m.minRowId() == 4L).get
+      
table.store().manifestFileFactory().create().delete(secondManifest.fileName())
 
       // 1.LeafPredicate
-      assertResult(Seq(new Range(0L, 0L)))(
-        getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0").readBuilder
-          .asInstanceOf[ReadBuilderImpl]
-          .rowRanges
-          .asScala)
       checkAnswer(
         sql("SELECT * FROM t WHERE _ROW_ID = 0"),
-        Seq(Row(1, 1, "1"))
+        Seq(Row(0, 0, "0"))
       )
-      assertResult(Seq(new Range(0L, 1L), new Range(3L, 3L)))(
-        getPaimonScan("SELECT * FROM t WHERE _ROW_ID IN (0, 1, 3)").readBuilder
-          .asInstanceOf[ReadBuilderImpl]
-          .rowRanges
-          .asScala)
       checkAnswer(
         sql("SELECT * FROM t WHERE _ROW_ID IN (0, 1, 3)"),
-        Seq(Row(1, 1, "1"), Row(2, 2, "2"), Row(4, 4, "4"))
+        Seq(Row(0, 0, "0"), Row(1, 1, "1"), Row(3, 3, "3"))
       )
-      assertResult(Seq(new Range(4L, 5L)))(
-        getPaimonScan("SELECT * FROM t WHERE _ROW_ID IN (4, 5)").readBuilder
-          .asInstanceOf[ReadBuilderImpl]
-          .rowRanges
-          .asScala)
       checkAnswer(
-        sql("SELECT * FROM t WHERE _ROW_ID IN (4, 5)"),
+        sql("SELECT * FROM t WHERE _ROW_ID IN (1, 6)"),
+        Seq(Row(1, 1, "1"))
+      )
+      checkAnswer(
+        sql("SELECT * FROM t WHERE _ROW_ID IN (6, 7)"),
         Seq()
       )
 
       // 2.CompoundPredicate
-      assertResult(Seq(new Range(0, 0)))(
-        getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (0, 
1)").readBuilder
-          .asInstanceOf[ReadBuilderImpl]
-          .rowRanges
-          .asScala)
       checkAnswer(
         sql("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (0, 1)"),
-        Seq(Row(1, 1, "1"))
+        Seq(Row(0, 0, "0"))
       )
-      assertResult(Seq(new Range(0, 2)))(
-        getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 OR _ROW_ID IN (1, 
2)").readBuilder
-          .asInstanceOf[ReadBuilderImpl]
-          .rowRanges
-          .asScala)
       checkAnswer(
         sql("SELECT * FROM t WHERE _ROW_ID = 0 OR _ROW_ID IN (1, 2)"),
-        Seq(Row(1, 1, "1"), Row(2, 2, "2"), Row(3, 3, "3"))
+        Seq(Row(0, 0, "0"), Row(1, 1, "1"), Row(2, 2, "2"))
       )
-      assertResult(Seq())(
-        getPaimonScan("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (1, 
2)").readBuilder
-          .asInstanceOf[ReadBuilderImpl]
-          .rowRanges
-          .asScala)
       checkAnswer(
         sql("SELECT * FROM t WHERE _ROW_ID = 0 AND _ROW_ID IN (1, 2)"),
         Seq()


Reply via email to