This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new af0d4162e8 [core] Refactory evolved and not evolved filters in file 
store scan (#5934)
af0d4162e8 is described below

commit af0d4162e87038baaa08fd9e76d87a8fcebb59e4
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 22 17:53:22 2025 +0800

    [core] Refactory evolved and not evolved filters in file store scan (#5934)
---
 .../paimon/operation/AppendOnlyFileStoreScan.java  | 46 ++++++++++++++------
 .../paimon/operation/KeyValueFileStoreScan.java    | 44 +++++++++++++------
 .../apache/paimon/schema/SchemaEvolutionUtil.java  | 49 +++++++---------------
 .../apache/paimon/stats/SimpleStatsEvolutions.java | 38 ++++++++++++++---
 .../apache/paimon/utils/FormatReaderMapping.java   |  2 +-
 .../paimon/schema/SchemaEvolutionUtilTest.java     |  2 +-
 6 files changed, 114 insertions(+), 67 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 0cf3dbdb69..21d9d3880d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -44,10 +44,13 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
 
     private final boolean fileIndexReadEnabled;
 
-    private Predicate filter;
+    private Predicate inputFilter;
 
-    // just cache.
-    private final Map<Long, Predicate> dataFilterMapping = new 
ConcurrentHashMap<>();
+    // cache not evolved filter by schema id
+    private final Map<Long, Predicate> notEvolvedFilterMapping = new 
ConcurrentHashMap<>();
+
+    // cache evolved filter by schema id
+    private final Map<Long, Predicate> evolvedFilterMapping = new 
ConcurrentHashMap<>();
 
     public AppendOnlyFileStoreScan(
             ManifestsReader manifestsReader,
@@ -72,7 +75,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
     }
 
     public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
-        this.filter = predicate;
+        this.inputFilter = predicate;
         
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
         return this;
     }
@@ -80,9 +83,15 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
     /** Note: Keep this thread-safe. */
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
-        Predicate safeFilter =
-                
simpleStatsEvolutions.toEvolutionSafeStatsFilter(entry.file().schemaId(), 
filter);
-        if (safeFilter == null) {
+        Predicate notEvolvedFilter =
+                notEvolvedFilterMapping.computeIfAbsent(
+                        entry.file().schemaId(),
+                        id ->
+                                // keepNewFieldFilter to handle add field
+                                // for example, add field 'c', 'c > 3': old 
files can be filtered
+                                simpleStatsEvolutions.filterUnsafeFilter(
+                                        entry.file().schemaId(), inputFilter, 
true));
+        if (notEvolvedFilter == null) {
             return true;
         }
 
@@ -93,12 +102,23 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                         entry.file().rowCount(),
                         entry.file().valueStatsCols());
 
-        return safeFilter.test(
+        // filter by min max
+        boolean result =
+                notEvolvedFilter.test(
                         entry.file().rowCount(),
                         stats.minValues(),
                         stats.maxValues(),
-                        stats.nullCounts())
-                && (!fileIndexReadEnabled || 
testFileIndex(entry.file().embeddedIndex(), entry));
+                        stats.nullCounts());
+
+        if (!result) {
+            return false;
+        }
+
+        if (!fileIndexReadEnabled) {
+            return true;
+        }
+
+        return testFileIndex(entry.file().embeddedIndex(), entry);
     }
 
     private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, 
ManifestEntry entry) {
@@ -109,11 +129,11 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
         RowType dataRowType = 
scanTableSchema(entry.file().schemaId()).logicalRowType();
 
         Predicate dataPredicate =
-                dataFilterMapping.computeIfAbsent(
+                evolvedFilterMapping.computeIfAbsent(
                         entry.file().schemaId(),
                         id ->
-                                
simpleStatsEvolutions.toEvolutionSafeStatsFilter(
-                                        entry.file().schemaId(), filter));
+                                simpleStatsEvolutions.tryDevolveFilter(
+                                        entry.file().schemaId(), inputFilter));
 
         try (FileIndexPredicate predicate =
                 new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index a1b364679b..1de22d4997 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -61,9 +61,17 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
 
     private Predicate keyFilter;
     private Predicate valueFilter;
-    private final Map<Long, Predicate> schemaId2DataFilter = new 
ConcurrentHashMap<>();
     private boolean valueFilterForceEnabled = false;
 
+    // cache not evolved filter by schema id
+    private final Map<Long, Predicate> notEvolvedKeyFilterMapping = new 
ConcurrentHashMap<>();
+
+    // cache not evolved filter by schema id
+    private final Map<Long, Predicate> notEvolvedValueFilterMapping = new 
ConcurrentHashMap<>();
+
+    // cache evolved filter by schema id
+    private final Map<Long, Predicate> evolvedValueFilterMapping = new 
ConcurrentHashMap<>();
+
     public KeyValueFileStoreScan(
             ManifestsReader manifestsReader,
             BucketSelectConverter bucketSelectConverter,
@@ -124,10 +132,15 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             return false;
         }
 
-        Predicate safeKeyFilter =
-                fieldKeyStatsConverters.toEvolutionSafeStatsFilter(
-                        entry.file().schemaId(), keyFilter);
-        if (safeKeyFilter == null) {
+        Predicate notEvolvedFilter =
+                notEvolvedKeyFilterMapping.computeIfAbsent(
+                        entry.file().schemaId(),
+                        id ->
+                                // keepNewFieldFilter to handle add field
+                                // for example, add field 'c', 'c > 3': old 
files can be filtered
+                                fieldKeyStatsConverters.filterUnsafeFilter(
+                                        entry.file().schemaId(), keyFilter, 
true));
+        if (notEvolvedFilter == null) {
             return true;
         }
 
@@ -136,7 +149,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 fieldKeyStatsConverters
                         .getOrCreate(file.schemaId())
                         .evolution(file.keyStats(), file.rowCount(), null);
-        return safeKeyFilter.test(
+        return notEvolvedFilter.test(
                 file.rowCount(), stats.minValues(), stats.maxValues(), 
stats.nullCounts());
     }
 
@@ -157,10 +170,10 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
         try (FileIndexPredicate predicate =
                 new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
             Predicate dataPredicate =
-                    schemaId2DataFilter.computeIfAbsent(
+                    evolvedValueFilterMapping.computeIfAbsent(
                             entry.file().schemaId(),
                             id ->
-                                    
fieldValueStatsConverters.toEvolutionSafeStatsFilter(
+                                    fieldValueStatsConverters.tryDevolveFilter(
                                             entry.file().schemaId(), 
valueFilter));
             return predicate.evaluate(dataPredicate).remain();
         } catch (IOException e) {
@@ -229,10 +242,15 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             return ((FilteredManifestEntry) entry).selected();
         }
 
-        Predicate safeValueFilter =
-                fieldValueStatsConverters.toEvolutionSafeStatsFilter(
-                        entry.file().schemaId(), valueFilter);
-        if (safeValueFilter == null) {
+        Predicate notEvolvedFilter =
+                notEvolvedValueFilterMapping.computeIfAbsent(
+                        entry.file().schemaId(),
+                        id ->
+                                // keepNewFieldFilter to handle add field
+                                // for example, add field 'c', 'c > 3': old 
files can be filtered
+                                fieldValueStatsConverters.filterUnsafeFilter(
+                                        entry.file().schemaId(), valueFilter, 
true));
+        if (notEvolvedFilter == null) {
             return true;
         }
 
@@ -241,7 +259,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 fieldValueStatsConverters
                         .getOrCreate(file.schemaId())
                         .evolution(file.valueStats(), file.rowCount(), 
file.valueStatsCols());
-        return safeValueFilter.test(
+        return notEvolvedFilter.test(
                         file.rowCount(),
                         result.minValues(),
                         result.maxValues(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index e7e4831386..a9075fb6f2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -42,6 +42,7 @@ import org.apache.paimon.utils.ProjectedRow;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -131,18 +132,17 @@ public class SchemaEvolutionUtil {
      * @param tableFields the table fields
      * @param dataFields the underlying data fields
      * @param filters the filters
-     * @param forData true if devolve the filters for filtering data file, 
otherwise, for filtering
-     *     manifest entry
+     * @param keepNewFieldFilter true if keep new field filter, the new field 
filter needs to be
+     *     properly handled
      * @return the data filters
      */
-    @Nullable
     public static List<Predicate> devolveFilters(
             List<DataField> tableFields,
             List<DataField> dataFields,
             List<Predicate> filters,
-            boolean forData) {
+            boolean keepNewFieldFilter) {
         if (filters == null) {
-            return null;
+            return Collections.emptyList();
         }
 
         Map<String, DataField> nameToTableFields =
@@ -159,36 +159,19 @@ public class SchemaEvolutionUtil {
                                     String.format("Find no field %s", 
predicate.fieldName()));
                     DataField dataField = idToDataFields.get(tableField.id());
                     if (dataField == null) {
-                        // For example, add field b and filter b, the filter 
is safe for old file
-                        // meta without field b because the index mapping 
array can handle null
-                        return forData ? Optional.empty() : 
Optional.of(predicate);
+                        return keepNewFieldFilter ? Optional.of(predicate) : 
Optional.empty();
                     }
 
-                    Optional<List<Object>> castedLiterals =
-                            CastExecutors.castLiteralsWithEvolution(
-                                    predicate.literals(), predicate.type(), 
dataField.type());
-
-                    // unsafe
-                    if (!castedLiterals.isPresent()) {
-                        return Optional.empty();
-                    }
-
-                    if (forData) {
-                        // For data, the filter will be pushdown to data file, 
so must use the index
-                        // and literal type of data file
-                        return Optional.of(
-                                new LeafPredicate(
-                                        predicate.function(),
-                                        dataField.type(),
-                                        indexOf(dataField, idToDataFields),
-                                        dataField.name(),
-                                        castedLiterals.get()));
-                    } else {
-                        // For meta, the index mapping array will map the 
index the cast the
-                        // literals, so just return self
-                        // In other words, return it if it's safe
-                        return Optional.of(predicate);
-                    }
+                    return CastExecutors.castLiteralsWithEvolution(
+                                    predicate.literals(), predicate.type(), 
dataField.type())
+                            .map(
+                                    literals ->
+                                            new LeafPredicate(
+                                                    predicate.function(),
+                                                    dataField.type(),
+                                                    indexOf(dataField, 
idToDataFields),
+                                                    dataField.name(),
+                                                    literals));
                 };
 
         for (Predicate predicate : filters) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
index eb44ea706c..682728f9a0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
@@ -21,20 +21,21 @@ package org.apache.paimon.stats;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.IndexCastMapping;
-import org.apache.paimon.schema.SchemaEvolutionUtil;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
+import static java.util.Collections.singletonList;
 import static 
org.apache.paimon.schema.SchemaEvolutionUtil.createIndexCastMapping;
+import static org.apache.paimon.schema.SchemaEvolutionUtil.devolveFilters;
 
 /** Converters to create col stats array serializer. */
 public class SimpleStatsEvolutions {
@@ -82,7 +83,7 @@ public class SimpleStatsEvolutions {
      * filter or null if can't.
      */
     @Nullable
-    public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable 
Predicate filter) {
+    public Predicate tryDevolveFilter(long dataSchemaId, @Nullable Predicate 
filter) {
         if (filter == null || dataSchemaId == tableSchemaId) {
             return filter;
         }
@@ -91,13 +92,38 @@ public class SimpleStatsEvolutions {
         // compute engine to perform p2.
         List<Predicate> filters = PredicateBuilder.splitAnd(filter);
         List<Predicate> devolved =
-                Objects.requireNonNull(
-                        SchemaEvolutionUtil.devolveFilters(
-                                tableDataFields, 
schemaFields.apply(dataSchemaId), filters, false));
+                devolveFilters(tableDataFields, 
schemaFields.apply(dataSchemaId), filters, false);
 
         return devolved.isEmpty() ? null : PredicateBuilder.and(devolved);
     }
 
+    /**
+     * Filter unsafe filter, for example, filter is 'a > 9', old type is 
String, new type is Int, if
+     * records are 9, 10 and 11, the evolved filter is not safe.
+     */
+    @Nullable
+    public Predicate filterUnsafeFilter(
+            long dataSchemaId, @Nullable Predicate filter, boolean 
keepNewFieldFilter) {
+        if (filter == null || dataSchemaId == tableSchemaId) {
+            return filter;
+        }
+
+        List<Predicate> filters = PredicateBuilder.splitAnd(filter);
+        List<DataField> oldSchema = schemaFields.apply(dataSchemaId);
+        List<Predicate> result = new ArrayList<>();
+        for (Predicate predicate : filters) {
+            if (!devolveFilters(
+                            tableDataFields,
+                            oldSchema,
+                            singletonList(predicate),
+                            keepNewFieldFilter)
+                    .isEmpty()) {
+                result.add(predicate);
+            }
+        }
+        return result.isEmpty() ? null : PredicateBuilder.and(result);
+    }
+
     public List<DataField> tableDataFields() {
         return tableDataFields;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 1c89971742..341199d1bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -309,7 +309,7 @@ public class FormatReaderMapping {
                     tableSchema.id() == dataSchema.id()
                             ? filters
                             : SchemaEvolutionUtil.devolveFilters(
-                                    tableSchema.fields(), dataSchema.fields(), 
filters, true);
+                                    tableSchema.fields(), dataSchema.fields(), 
filters, false);
 
             // Skip pushing down partition filters to reader.
             return excludePredicateWithFields(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
index 2883b36b5c..9466e69db3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
@@ -91,7 +91,7 @@ public class SchemaEvolutionUtilTest {
                         IsNull.INSTANCE, DataTypes.INT(), 7, "a", 
Collections.emptyList()));
 
         List<Predicate> filters =
-                SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields, 
predicates, true);
+                SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields, 
predicates, false);
         assertThat(filters).isNotNull();
         assertThat(filters.size()).isEqualTo(1);
 

Reply via email to