This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6298f04bb9 [core] Enable manifest filter in data evolution table 
(#6455)
6298f04bb9 is described below

commit 6298f04bb95788b93db15604369e6ee8c2fe0f8b
Author: YeJunHao <[email protected]>
AuthorDate: Sun Oct 26 13:58:45 2025 +0800

    [core] Enable manifest filter in data evolution table (#6455)
---
 .../apache/paimon/reader/DataEvolutionArray.java   | 228 +++++++++++++++++++++
 .../org/apache/paimon/reader/DataEvolutionRow.java |  13 ++
 .../paimon/operation/AbstractFileStoreScan.java    |   6 +-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   8 +-
 .../operation/DataEvolutionFileStoreScan.java      | 117 +++++++++++
 .../paimon/operation/KeyValueFileStoreScan.java    |   5 +
 .../table/source/DataEvolutionSplitGenerator.java  |  71 +++++--
 .../paimon/table/DataEvolutionTableTest.java       |   8 +
 8 files changed, 433 insertions(+), 23 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
new file mode 100644
index 0000000000..7a14a3ef9c
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+
+/** The array which is made up by several rows. */
+public class DataEvolutionArray implements InternalArray {
+
+    private final InternalArray[] rows;
+    private final int[] rowOffsets;
+    private final int[] fieldOffsets;
+
+    public DataEvolutionArray(int rowNumber, int[] rowOffsets, int[] 
fieldOffsets) {
+        this.rows = new InternalArray[rowNumber];
+        this.rowOffsets = rowOffsets;
+        this.fieldOffsets = fieldOffsets;
+    }
+
+    public void setRow(int pos, InternalArray row) {
+        if (pos >= rows.length) {
+            throw new IndexOutOfBoundsException(
+                    "Position " + pos + " is out of bounds for rows size " + 
rows.length);
+        } else {
+            rows[pos] = row;
+        }
+    }
+
+    public void setRows(InternalArray[] rows) {
+        if (rows.length != this.rows.length) {
+            throw new IllegalArgumentException(
+                    "The length of input rows "
+                            + rows.length
+                            + " is not equal to the expected length "
+                            + this.rows.length);
+        }
+        for (int i = 0; i < rows.length; i++) {
+            setRow(i, rows[i]);
+        }
+    }
+
+    private InternalArray chooseArray(int pos) {
+        return rows[(rowOffsets[pos])];
+    }
+
+    private int offsetInRow(int pos) {
+        return fieldOffsets[pos];
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        if (rowOffsets[pos] == -1) {
+            return true;
+        }
+        return chooseArray(pos).isNullAt(offsetInRow(pos));
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return chooseArray(pos).getBoolean(offsetInRow(pos));
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return chooseArray(pos).getByte(offsetInRow(pos));
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return chooseArray(pos).getShort(offsetInRow(pos));
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return chooseArray(pos).getInt(offsetInRow(pos));
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return chooseArray(pos).getLong(offsetInRow(pos));
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return chooseArray(pos).getFloat(offsetInRow(pos));
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return chooseArray(pos).getDouble(offsetInRow(pos));
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return chooseArray(pos).getString(offsetInRow(pos));
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        return chooseArray(pos).getDecimal(offsetInRow(pos), precision, scale);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        return chooseArray(pos).getTimestamp(offsetInRow(pos), precision);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        return chooseArray(pos).getBinary(offsetInRow(pos));
+    }
+
+    @Override
+    public Variant getVariant(int pos) {
+        return chooseArray(pos).getVariant(offsetInRow(pos));
+    }
+
+    @Override
+    public Blob getBlob(int pos) {
+        return chooseArray(pos).getBlob(offsetInRow(pos));
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        return chooseArray(pos).getArray(offsetInRow(pos));
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        return chooseArray(pos).getMap(offsetInRow(pos));
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        return chooseArray(pos).getRow(offsetInRow(pos), numFields);
+    }
+
+    @Override
+    public int size() {
+        return rowOffsets.length;
+    }
+
+    @Override
+    public boolean[] toBooleanArray() {
+        boolean[] result = new boolean[rowOffsets.length];
+        for (int i = 0; i < rowOffsets.length; i++) {
+            result[i] = getBoolean(i);
+        }
+        return result;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        byte[] result = new byte[rowOffsets.length];
+        for (int i = 0; i < rowOffsets.length; i++) {
+            result[i] = getByte(i);
+        }
+        return result;
+    }
+
+    @Override
+    public short[] toShortArray() {
+        short[] result = new short[rowOffsets.length];
+        for (int i = 0; i < rowOffsets.length; i++) {
+            result[i] = getShort(i);
+        }
+        return result;
+    }
+
+    @Override
+    public int[] toIntArray() {
+        int[] result = new int[rowOffsets.length];
+        for (int i = 0; i < rowOffsets.length; i++) {
+            result[i] = getInt(i);
+        }
+        return result;
+    }
+
+    @Override
+    public long[] toLongArray() {
+        long[] result = new long[rowOffsets.length];
+        for (int i = 0; i < rowOffsets.length; i++) {
+            result[i] = getLong(i);
+        }
+        return result;
+    }
+
+    @Override
+    public float[] toFloatArray() {
+        float[] result = new float[rowOffsets.length];
+        for (int i = 0; i < rowOffsets.length; i++) {
+            result[i] = getFloat(i);
+        }
+        return result;
+    }
+
+    @Override
+    public double[] toDoubleArray() {
+        double[] result = new double[rowOffsets.length];
+        for (int i = 0; i < rowOffsets.length; i++) {
+            result[i] = getDouble(i);
+        }
+        return result;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
index ce65026390..824a0c438d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
@@ -58,6 +58,19 @@ public class DataEvolutionRow implements InternalRow {
         }
     }
 
+    public void setRows(InternalRow[] rows) {
+        if (rows.length != this.rows.length) {
+            throw new IllegalArgumentException(
+                    "The length of input rows "
+                            + rows.length
+                            + " is not equal to the expected length "
+                            + this.rows.length);
+        }
+        for (int i = 0; i < rows.length; i++) {
+            setRow(i, rows[i]);
+        }
+    }
+
     private InternalRow chooseRow(int pos) {
         return rows[(rowOffsets[pos])];
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index b21c77db1d..0e3f39715d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -74,7 +74,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
     private final SchemaManager schemaManager;
-    private final TableSchema schema;
+    protected final TableSchema schema;
 
     private Snapshot specifiedSnapshot = null;
     private boolean onlyReadRealBuckets = false;
@@ -255,6 +255,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             files.add(iterator.next());
         }
 
+        files = postFilter(files);
+
         if (wholeBucketFilterEnabled()) {
             // We group files by bucket here, and filter them by the whole 
bucket filter.
             // Why do this: because in primary key table, we can't just filter 
the value
@@ -422,6 +424,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     /** Note: Keep this thread-safe. */
     protected abstract boolean filterByStats(ManifestEntry entry);
 
+    protected abstract List<ManifestEntry> postFilter(List<ManifestEntry> 
entries);
+
     protected boolean wholeBucketFilterEnabled() {
         return false;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 21d9d3880d..be5c48539d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -33,6 +33,7 @@ import org.apache.paimon.utils.SnapshotManager;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -44,7 +45,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
 
     private final boolean fileIndexReadEnabled;
 
-    private Predicate inputFilter;
+    protected Predicate inputFilter;
 
     // cache not evolved filter by schema id
     private final Map<Long, Predicate> notEvolvedFilterMapping = new 
ConcurrentHashMap<>();
@@ -121,6 +122,11 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
         return testFileIndex(entry.file().embeddedIndex(), entry);
     }
 
+    @Override
+    protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+        return entries;
+    }
+
     private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, 
ManifestEntry entry) {
         if (embeddedIndexBytes == null) {
             return true;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index fad67e3183..03f36d2ea0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -18,16 +18,33 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.data.BinaryArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.DataEvolutionArray;
+import org.apache.paimon.reader.DataEvolutionRow;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsEvolution;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.SnapshotManager;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
 /** {@link FileStoreScan} for data-evolution enabled table. */
 public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
 
+    private boolean dropStats = false;
+
     public DataEvolutionFileStoreScan(
             ManifestsReader manifestsReader,
             BucketSelectConverter bucketSelectConverter,
@@ -47,10 +64,110 @@ public class DataEvolutionFileStoreScan extends 
AppendOnlyFileStoreScan {
                 false);
     }
 
+    @Override
+    public FileStoreScan dropStats() {
+        this.dropStats = true;
+        return this;
+    }
+
+    @Override
+    public FileStoreScan keepStats() {
+        this.dropStats = false;
+        return this;
+    }
+
     public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
+        this.inputFilter = predicate;
         return this;
     }
 
+    @Override
+    protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+        if (inputFilter == null) {
+            return entries;
+        }
+        List<List<ManifestEntry>> splitByRowId =
+                DataEvolutionSplitGenerator.splitManifests(entries);
+
+        return splitByRowId.stream()
+                .filter(this::filterByStats)
+                .flatMap(Collection::stream)
+                .map(entry -> dropStats ? dropStats(entry) : entry)
+                .collect(Collectors.toList());
+    }
+
+    private boolean filterByStats(List<ManifestEntry> metas) {
+        long rowCount = metas.get(0).file().rowCount();
+        SimpleStatsEvolution.Result evolutionResult = evolutionStats(metas);
+        return inputFilter.test(
+                rowCount,
+                evolutionResult.minValues(),
+                evolutionResult.maxValues(),
+                evolutionResult.nullCounts());
+    }
+
+    private SimpleStatsEvolution.Result evolutionStats(List<ManifestEntry> 
metas) {
+        int[] allFields = 
schema.fields().stream().mapToInt(DataField::id).toArray();
+        int fieldsCount = schema.fields().size();
+        int[] rowOffsets = new int[fieldsCount];
+        int[] fieldOffsets = new int[fieldsCount];
+        Arrays.fill(rowOffsets, -1);
+        Arrays.fill(fieldOffsets, -1);
+
+        InternalRow[] min = new InternalRow[metas.size()];
+        InternalRow[] max = new InternalRow[metas.size()];
+        BinaryArray[] nullCounts = new BinaryArray[metas.size()];
+
+        for (int i = 0; i < metas.size(); i++) {
+            SimpleStats stats = metas.get(i).file().valueStats();
+            min[i] = stats.minValues();
+            max[i] = stats.maxValues();
+            nullCounts[i] = stats.nullCounts();
+        }
+
+        for (int i = 0; i < metas.size(); i++) {
+            DataFileMeta fileMeta = metas.get(i).file();
+            TableSchema dataFileSchema =
+                    scanTableSchema(fileMeta.schemaId())
+                            .project(
+                                    fileMeta.valueStatsCols() == null
+                                            ? fileMeta.writeCols()
+                                            : fileMeta.valueStatsCols());
+            int[] fieldIds =
+                    
SpecialFields.rowTypeWithRowTracking(dataFileSchema.logicalRowType())
+                            .getFields().stream()
+                            .mapToInt(DataField::id)
+                            .toArray();
+
+            int count = 0;
+            for (int j = 0; j < fieldsCount; j++) {
+                for (int fieldId : fieldIds) {
+                    if (allFields[j] == fieldId) {
+                        // TODO: If type not match (e.g. int -> string), we 
need to skip this, set
+                        // rowOffsets[j] = -1 always. (may -2, after all, set 
it back to -1)
+                        // Because schema evolution may happen to change int 
to string or something
+                        // like that.
+                        if (rowOffsets[j] == -1) {
+                            rowOffsets[j] = i;
+                            fieldOffsets[j] = count++;
+                        }
+                        break;
+                    }
+                }
+            }
+        }
+
+        DataEvolutionRow finalMin = new DataEvolutionRow(metas.size(), 
rowOffsets, fieldOffsets);
+        DataEvolutionRow finalMax = new DataEvolutionRow(metas.size(), 
rowOffsets, fieldOffsets);
+        DataEvolutionArray finalNullCounts =
+                new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets);
+
+        finalMin.setRows(min);
+        finalMax.setRows(max);
+        finalNullCounts.setRows(nullCounts);
+        return new SimpleStatsEvolution.Result(finalMin, finalMax, 
finalNullCounts);
+    }
+
     /** Note: Keep this thread-safe. */
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 1de22d4997..38bb6cdeef 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -153,6 +153,11 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 file.rowCount(), stats.minValues(), stats.maxValues(), 
stats.nullCounts());
     }
 
+    @Override
+    protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+        return entries;
+    }
+
     @Override
     protected ManifestEntry dropStats(ManifestEntry entry) {
         if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index 05bd3ae4ed..7035bf4f49 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.utils.BinPacking;
 
 import java.util.ArrayList;
@@ -77,39 +78,63 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
     }
 
     public static List<List<DataFileMeta>> split(List<DataFileMeta> files) {
-        List<List<DataFileMeta>> splitByRowId = new ArrayList<>();
+        return split(
+                files,
+                DataFileMeta::fileName,
+                DataFileMeta::firstRowId,
+                DataFileMeta::rowCount,
+                DataFileMeta::maxSequenceNumber);
+    }
+
+    public static List<List<ManifestEntry>> splitManifests(List<ManifestEntry> 
entries) {
+        return split(
+                entries,
+                entry -> entry.file().fileName(),
+                entry -> entry.file().firstRowId(),
+                entry -> entry.file().rowCount(),
+                entry -> entry.file().maxSequenceNumber());
+    }
+
+    public static <T> List<List<T>> split(
+            List<T> files,
+            Function<T, String> fileNameF,
+            Function<T, Long> firstRowIdF,
+            Function<T, Long> rowCountF,
+            Function<T, Long> maxSequenceNumberF) {
+        List<List<T>> splitByRowId = new ArrayList<>();
         // Sort files by firstRowId and then by maxSequenceNumber
         files.sort(
                 Comparator.comparingLong(
-                                (ToLongFunction<DataFileMeta>)
+                                (ToLongFunction<T>)
                                         value ->
-                                                value.firstRowId() == null
+                                                firstRowIdF.apply(value) == 
null
                                                         ? Long.MIN_VALUE
-                                                        : value.firstRowId())
-                        .thenComparingInt(f -> isBlobFile(f.fileName()) ? 1 : 
0)
+                                                        : 
firstRowIdF.apply(value))
+                        .thenComparingInt(f -> isBlobFile(fileNameF.apply(f)) 
? 1 : 0)
                         .thenComparing(
                                 (f1, f2) -> {
                                     // If firstRowId is the same, we should 
read the file with
                                     // larger sequence number first. Because 
larger sequence number
                                     // file is more fresh
                                     return Long.compare(
-                                            f2.maxSequenceNumber(), 
f1.maxSequenceNumber());
+                                            maxSequenceNumberF.apply(f2),
+                                            maxSequenceNumberF.apply(f1));
                                 }));
 
-        files = filterBlob(files);
+        files = filterBlob(files, fileNameF, firstRowIdF, rowCountF);
 
         // Split files by firstRowId
         long lastRowId = -1;
         long checkRowIdStart = 0;
-        List<DataFileMeta> currentSplit = new ArrayList<>();
+        List<T> currentSplit = new ArrayList<>();
         for (int i = 0; i < files.size(); i++) {
-            DataFileMeta file = files.get(i);
-            Long firstRowId = file.firstRowId();
+            T file = files.get(i);
+            Long firstRowId = firstRowIdF.apply(file);
             if (firstRowId == null) {
                 splitByRowId.add(Collections.singletonList(file));
                 continue;
             }
-            if (!isBlobFile(file.fileName()) && firstRowId != lastRowId) {
+            if (!isBlobFile(fileNameF.apply(file)) && firstRowId != lastRowId) 
{
                 if (!currentSplit.isEmpty()) {
                     splitByRowId.add(currentSplit);
                 }
@@ -118,13 +143,13 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
                             String.format(
                                     "There are overlapping files in the split: 
\n %s, the wrong file is: \n %s",
                                     files.subList(Math.max(0, i - 20), 
i).stream()
-                                            .map(DataFileMeta::toString)
+                                            .map(Object::toString)
                                             .collect(Collectors.joining(",")),
                                     file));
                 }
                 currentSplit = new ArrayList<>();
                 lastRowId = firstRowId;
-                checkRowIdStart = firstRowId + file.rowCount();
+                checkRowIdStart = firstRowId + rowCountF.apply(file);
             }
             currentSplit.add(file);
         }
@@ -135,21 +160,25 @@ public class DataEvolutionSplitGenerator implements 
SplitGenerator {
         return splitByRowId;
     }
 
-    private static List<DataFileMeta> filterBlob(List<DataFileMeta> files) {
-        List<DataFileMeta> result = new ArrayList<>();
+    private static <T> List<T> filterBlob(
+            List<T> files,
+            Function<T, String> fileNameF,
+            Function<T, Long> firstRowIdF,
+            Function<T, Long> rowCountF) {
+        List<T> result = new ArrayList<>();
         long rowIdStart = -1;
         long rowIdEnd = -1;
-        for (DataFileMeta file : files) {
-            if (file.firstRowId() == null) {
+        for (T file : files) {
+            if (firstRowIdF.apply(file) == null) {
                 result.add(file);
                 continue;
             }
-            if (!isBlobFile(file.fileName())) {
-                rowIdStart = file.firstRowId();
-                rowIdEnd = file.firstRowId() + file.rowCount();
+            if (!isBlobFile(fileNameF.apply(file))) {
+                rowIdStart = firstRowIdF.apply(file);
+                rowIdEnd = firstRowIdF.apply(file) + rowCountF.apply(file);
                 result.add(file);
             } else {
-                if (file.firstRowId() >= rowIdStart && file.firstRowId() < 
rowIdEnd) {
+                if (firstRowIdF.apply(file) >= rowIdStart && 
firstRowIdF.apply(file) < rowIdEnd) {
                     result.add(file);
                 }
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index dc4db3da5b..6ddc9b410d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -417,6 +417,14 @@ public class DataEvolutionTableTest extends TableTestBase {
         readBuilder.withFilter(predicate);
         assertThat(((DataSplit) 
readBuilder.newScan().plan().splits().get(0)).dataFiles().size())
                 .isEqualTo(2);
+
+        predicate = predicateBuilder.notEqual(1, BinaryString.fromString("a"));
+        readBuilder.withFilter(predicate);
+        assertThat(readBuilder.newScan().plan().splits().isEmpty()).isTrue();
+
+        predicate = predicateBuilder.notEqual(2, BinaryString.fromString("c"));
+        readBuilder.withFilter(predicate);
+        assertThat(readBuilder.newScan().plan().splits().isEmpty()).isTrue();
     }
 
     protected Schema schemaDefault() {

Reply via email to