This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a70b56982 [core] Fix stats filter by pushdown after schema evolution 
(#5825)
2a70b56982 is described below

commit 2a70b56982d41f3abb6e073099f1929c2164d9d9
Author: yuzelin <[email protected]>
AuthorDate: Tue Jul 22 14:11:32 2025 +0800

    [core] Fix stats filter by pushdown after schema evolution (#5825)
---
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   8 +-
 .../paimon/operation/KeyValueFileStoreScan.java    |  37 ++++--
 .../apache/paimon/schema/SchemaEvolutionUtil.java  |  46 +++++--
 .../apache/paimon/stats/SimpleStatsEvolutions.java |  23 ++--
 .../apache/paimon/utils/FormatReaderMapping.java   |   4 +-
 .../test/java/org/apache/paimon/TestFileStore.java |   9 +-
 .../paimon/schema/SchemaEvolutionUtilTest.java     |   4 +-
 .../paimon/table/ColumnTypeFileDataTestBase.java   |  14 +--
 .../paimon/table/ColumnTypeFileMetaTestBase.java   | 137 +--------------------
 .../table/PrimaryKeyColumnTypeFileDataTest.java    |   5 +
 .../PrimaryKeyTableColumnTypeFileMetaTest.java     |  54 +-------
 .../apache/paimon/table/SchemaEvolutionTest.java   |  59 ++++++++-
 .../FilterPushdownWithSchemaChangeITCase.java      |  26 ++++
 13 files changed, 191 insertions(+), 235 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 9e4bf12f04..0cf3dbdb69 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -80,7 +80,9 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
     /** Note: Keep this thread-safe. */
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
-        if (filter == null) {
+        Predicate safeFilter =
+                
simpleStatsEvolutions.toEvolutionSafeStatsFilter(entry.file().schemaId(), 
filter);
+        if (safeFilter == null) {
             return true;
         }
 
@@ -91,7 +93,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                         entry.file().rowCount(),
                         entry.file().valueStatsCols());
 
-        return filter.test(
+        return safeFilter.test(
                         entry.file().rowCount(),
                         stats.minValues(),
                         stats.maxValues(),
@@ -110,7 +112,7 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 dataFilterMapping.computeIfAbsent(
                         entry.file().schemaId(),
                         id ->
-                                simpleStatsEvolutions.tryDevolveFilter(
+                                
simpleStatsEvolutions.toEvolutionSafeStatsFilter(
                                         entry.file().schemaId(), filter));
 
         try (FileIndexPredicate predicate =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 9845b01346..a1b364679b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -85,10 +85,11 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 manifestFileFactory,
                 scanManifestParallelism);
         this.bucketSelectConverter = bucketSelectConverter;
+        // NOTE: don't add key prefix to field names because 
fieldKeyStatsConverters is used for
+        // filter conversion
         this.fieldKeyStatsConverters =
                 new SimpleStatsEvolutions(
-                        sid -> 
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
-                        schema.id());
+                        sid -> 
scanTableSchema(sid).trimmedPrimaryKeysFields(), schema.id());
         this.fieldValueStatsConverters =
                 new SimpleStatsEvolutions(
                         sid -> 
keyValueFieldsExtractor.valueFields(scanTableSchema(sid)),
@@ -119,21 +120,24 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
     /** Note: Keep this thread-safe. */
     @Override
     protected boolean filterByStats(ManifestEntry entry) {
-        DataFileMeta file = entry.file();
         if (isValueFilterEnabled() && !filterByValueFilter(entry)) {
             return false;
         }
 
-        if (keyFilter != null) {
-            SimpleStatsEvolution.Result stats =
-                    fieldKeyStatsConverters
-                            .getOrCreate(file.schemaId())
-                            .evolution(file.keyStats(), file.rowCount(), null);
-            return keyFilter.test(
-                    file.rowCount(), stats.minValues(), stats.maxValues(), 
stats.nullCounts());
+        Predicate safeKeyFilter =
+                fieldKeyStatsConverters.toEvolutionSafeStatsFilter(
+                        entry.file().schemaId(), keyFilter);
+        if (safeKeyFilter == null) {
+            return true;
         }
 
-        return true;
+        DataFileMeta file = entry.file();
+        SimpleStatsEvolution.Result stats =
+                fieldKeyStatsConverters
+                        .getOrCreate(file.schemaId())
+                        .evolution(file.keyStats(), file.rowCount(), null);
+        return safeKeyFilter.test(
+                file.rowCount(), stats.minValues(), stats.maxValues(), 
stats.nullCounts());
     }
 
     @Override
@@ -156,7 +160,7 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                     schemaId2DataFilter.computeIfAbsent(
                             entry.file().schemaId(),
                             id ->
-                                    fieldValueStatsConverters.tryDevolveFilter(
+                                    
fieldValueStatsConverters.toEvolutionSafeStatsFilter(
                                             entry.file().schemaId(), 
valueFilter));
             return predicate.evaluate(dataPredicate).remain();
         } catch (IOException e) {
@@ -225,12 +229,19 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             return ((FilteredManifestEntry) entry).selected();
         }
 
+        Predicate safeValueFilter =
+                fieldValueStatsConverters.toEvolutionSafeStatsFilter(
+                        entry.file().schemaId(), valueFilter);
+        if (safeValueFilter == null) {
+            return true;
+        }
+
         DataFileMeta file = entry.file();
         SimpleStatsEvolution.Result result =
                 fieldValueStatsConverters
                         .getOrCreate(file.schemaId())
                         .evolution(file.valueStats(), file.rowCount(), 
file.valueStatsCols());
-        return valueFilter.test(
+        return safeValueFilter.test(
                         file.rowCount(),
                         result.minValues(),
                         result.maxValues(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index d30fe19abb..e7e4831386 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -131,11 +131,16 @@ public class SchemaEvolutionUtil {
      * @param tableFields the table fields
      * @param dataFields the underlying data fields
      * @param filters the filters
+     * @param forData true if devolve the filters for filtering data file, 
otherwise, for filtering
+     *     manifest entry
      * @return the data filters
      */
     @Nullable
-    public static List<Predicate> devolveDataFilters(
-            List<DataField> tableFields, List<DataField> dataFields, 
List<Predicate> filters) {
+    public static List<Predicate> devolveFilters(
+            List<DataField> tableFields,
+            List<DataField> dataFields,
+            List<Predicate> filters,
+            boolean forData) {
         if (filters == null) {
             return null;
         }
@@ -154,19 +159,36 @@ public class SchemaEvolutionUtil {
                                     String.format("Find no field %s", 
predicate.fieldName()));
                     DataField dataField = idToDataFields.get(tableField.id());
                     if (dataField == null) {
+                        // For example, add field b and filter b, the filter 
is safe for old file
+                        // meta without field b because the index mapping 
array can handle null
+                        return forData ? Optional.empty() : 
Optional.of(predicate);
+                    }
+
+                    Optional<List<Object>> castedLiterals =
+                            CastExecutors.castLiteralsWithEvolution(
+                                    predicate.literals(), predicate.type(), 
dataField.type());
+
+                    // unsafe
+                    if (!castedLiterals.isPresent()) {
                         return Optional.empty();
                     }
 
-                    return CastExecutors.castLiteralsWithEvolution(
-                                    predicate.literals(), predicate.type(), 
dataField.type())
-                            .map(
-                                    literals ->
-                                            new LeafPredicate(
-                                                    predicate.function(),
-                                                    dataField.type(),
-                                                    indexOf(dataField, 
idToDataFields),
-                                                    dataField.name(),
-                                                    literals));
+                    if (forData) {
+                        // For data, the filter will be pushdown to data file, 
so must use the index
+                        // and literal type of data file
+                        return Optional.of(
+                                new LeafPredicate(
+                                        predicate.function(),
+                                        dataField.type(),
+                                        indexOf(dataField, idToDataFields),
+                                        dataField.name(),
+                                        castedLiterals.get()));
+                    } else {
+                        // For meta, the index mapping array will map the 
index the cast the
+                        // literals, so just return self
+                        // In other words, return it if it's safe
+                        return Optional.of(predicate);
+                    }
                 };
 
         for (Predicate predicate : filters) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
index 566cae9e65..eb44ea706c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.stats;
 
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.IndexCastMapping;
 import org.apache.paimon.schema.SchemaEvolutionUtil;
 import org.apache.paimon.types.DataField;
@@ -26,7 +27,6 @@ import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -77,18 +77,25 @@ public class SimpleStatsEvolutions {
                 });
     }
 
+    /**
+     * If the file's schema id != current table schema id, convert the filter 
to evolution safe
+     * filter or null if can't.
+     */
     @Nullable
-    public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) {
-        if (tableSchemaId == dataSchemaId) {
+    public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable 
Predicate filter) {
+        if (filter == null || dataSchemaId == tableSchemaId) {
             return filter;
         }
+
+        // Filter p1 && p2, if only p1 is safe, we can return only p1 to try 
best filter and let the
+        // compute engine to perform p2.
+        List<Predicate> filters = PredicateBuilder.splitAnd(filter);
         List<Predicate> devolved =
                 Objects.requireNonNull(
-                        SchemaEvolutionUtil.devolveDataFilters(
-                                schemaFields.apply(tableSchemaId),
-                                schemaFields.apply(dataSchemaId),
-                                Collections.singletonList(filter)));
-        return devolved.isEmpty() ? null : devolved.get(0);
+                        SchemaEvolutionUtil.devolveFilters(
+                                tableDataFields, 
schemaFields.apply(dataSchemaId), filters, false));
+
+        return devolved.isEmpty() ? null : PredicateBuilder.and(devolved);
     }
 
     public List<DataField> tableDataFields() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 8de9bf8508..1c89971742 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -308,8 +308,8 @@ public class FormatReaderMapping {
             List<Predicate> dataFilters =
                     tableSchema.id() == dataSchema.id()
                             ? filters
-                            : SchemaEvolutionUtil.devolveDataFilters(
-                                    tableSchema.fields(), dataSchema.fields(), 
filters);
+                            : SchemaEvolutionUtil.devolveFilters(
+                                    tableSchema.fields(), dataSchema.fields(), 
filters, true);
 
             // Skip pushing down partition filters to reader.
             return excludePredicateWithFields(
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index efe4c1f62c..7d303cad2d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -53,6 +53,7 @@ import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.ExpireChangelogImpl;
 import org.apache.paimon.table.ExpireSnapshots;
 import org.apache.paimon.table.ExpireSnapshotsImpl;
+import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
@@ -126,7 +127,7 @@ public class TestFileStore extends KeyValueFileStore {
                                 valueType.getFields(),
                                 valueType.getFieldCount(),
                                 partitionType.getFieldNames(),
-                                keyType.getFieldNames(),
+                                cleanPrimaryKeys(keyType.getFieldNames()),
                                 Collections.emptyMap(),
                                 null),
                 false,
@@ -148,6 +149,12 @@ public class TestFileStore extends KeyValueFileStore {
         this.commitIdentifier = 0L;
     }
 
+    private static List<String> cleanPrimaryKeys(List<String> primaryKeys) {
+        return primaryKeys.stream()
+                .map(k -> k.substring(SpecialFields.KEY_FIELD_PREFIX.length()))
+                .collect(Collectors.toList());
+    }
+
     private static SchemaManager schemaManager(String root, CoreOptions 
options) {
         return new SchemaManager(FileIOFinder.find(new Path(root)), 
options.path());
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
index 291ad0ef4f..2883b36b5c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
@@ -91,8 +91,8 @@ public class SchemaEvolutionUtilTest {
                         IsNull.INSTANCE, DataTypes.INT(), 7, "a", 
Collections.emptyList()));
 
         List<Predicate> filters =
-                SchemaEvolutionUtil.devolveDataFilters(tableFields2, 
dataFields, predicates);
-        assert filters != null;
+                SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields, 
predicates, true);
+        assertThat(filters).isNotNull();
         assertThat(filters.size()).isEqualTo(1);
 
         LeafPredicate child1 = (LeafPredicate) filters.get(0);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java
index af927377ee..5075343adf 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java
@@ -119,14 +119,8 @@ public abstract class ColumnTypeFileDataTestBase extends 
SchemaEvolutionTableTes
                 (files, schemas) -> {
                     FileStoreTable table = createFileStoreTable(schemas);
 
-                    /**
-                     * Filter field "g" in [200, 500] in SCHEMA_FIELDS which 
is updated from bigint
-                     * to float and will get another file with one data as 
followed:
-                     *
-                     * <ul>
-                     *   
<li>2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
-                     * </ul>
-                     */
+                    // Filter field "g" in [200, 500] in SCHEMA_FIELDS cannot 
filter old file 1 and
+                    // file 2 because filter is forbidden
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
@@ -139,8 +133,12 @@ public abstract class ColumnTypeFileDataTestBase extends 
SchemaEvolutionTableTes
                     List<InternalRow.FieldGetter> fieldGetterList = 
getFieldGetterList(table);
                     assertThat(getResult(table.newRead(), splits, 
fieldGetterList))
                             .containsExactlyInAnyOrder(
+                                    // old file1
+                                    
"1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110",
+                                    // old file2
                                     
"2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210",
                                     
"2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310",
+                                    // normal filtered data
                                     
"2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410");
                 },
                 getPrimaryKeyNames(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
index 7f264fa817..e92ce9b49b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
@@ -20,23 +20,16 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.stats.SimpleStatsEvolution;
-import org.apache.paimon.stats.SimpleStatsEvolutions;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.types.DataField;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -137,16 +130,8 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
                 (files, schemas) -> {
                     FileStoreTable table = createFileStoreTable(schemas);
 
-                    /*
-                     Filter field "g" in [200, 500] in SCHEMA_FIELDS which is 
updated from bigint
-                     to float and will get another file with one data as 
followed:
-
-                     <ul>
-                       
<li>2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
-                     </ul>
-
-                     <p>Then we can check the results of the two result files.
-                    */
+                    // Filter field "g" in [200, 500], get old file 1,2 
(filter is forbidden) and
+                    // new file 1
                     List<DataSplit> splits =
                             table.newSnapshotReader()
                                     .withFilter(
@@ -154,7 +139,7 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
                                                     .between(6, 200F, 500F))
                                     .read()
                                     .dataSplits();
-                    checkFilterRowCount(toDataFileMetas(splits), 3L);
+                    checkFilterRowCount(toDataFileMetas(splits), 4L);
 
                     List<String> filesName =
                             
files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
@@ -169,9 +154,6 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
                                             .map(DataFileMeta::fileName)
                                             .collect(Collectors.toList()))
                             .containsAll(filesName);
-
-                    validateValuesWithNewSchema(
-                            schemas, table.schema().id(), filesName, 
fileMetaList);
                 },
                 getPrimaryKeyNames(),
                 tableConfig,
@@ -220,125 +202,12 @@ public abstract class ColumnTypeFileMetaTestBase extends 
SchemaEvolutionTableTes
                                             .map(DataFileMeta::fileName)
                                             .collect(Collectors.toList()))
                             .containsAll(filesName);
-
-                    // Compare all columns with table column type
-                    validateValuesWithNewSchema(
-                            schemas, table.schema().id(), filesName, 
fileMetaList);
                 },
                 getPrimaryKeyNames(),
                 tableConfig,
                 this::createFileStoreTable);
     }
 
-    protected void validateValuesWithNewSchema(
-            Map<Long, TableSchema> tableSchemas,
-            long schemaId,
-            List<String> filesName,
-            List<DataFileMeta> fileMetaList) {
-        Function<Long, List<DataField>> schemaFields = id -> 
tableSchemas.get(id).fields();
-        SimpleStatsEvolutions converters = new 
SimpleStatsEvolutions(schemaFields, schemaId);
-        for (DataFileMeta fileMeta : fileMetaList) {
-            SimpleStats stats = getTableValueStats(fileMeta);
-            SimpleStatsEvolution.Result result =
-                    
converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null);
-            InternalRow min = result.minValues();
-            InternalRow max = result.maxValues();
-            assertThat(stats.minValues().getFieldCount()).isEqualTo(12);
-            if (filesName.contains(fileMeta.fileName())) {
-                checkTwoValues(min, max);
-            } else {
-                checkOneValue(min, max);
-            }
-        }
-    }
-
-    /**
-     * Check file data with one data.
-     *
-     * <ul>
-     *   <li>data:
-     *       
2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
-     *   <li>types: a->int, b->varchar[10], c->varchar[10], d->double, e->int, 
f->decimal,g->float,
-     *       h->double, i->decimal, j->date, k->date, l->varbinary
-     * </ul>
-     */
-    private void checkOneValue(InternalRow min, InternalRow max) {
-        assertThat(min.getInt(0)).isEqualTo(max.getInt(0)).isEqualTo(2);
-        assertThat(min.getString(1))
-                .isEqualTo(max.getString(1))
-                .isEqualTo(BinaryString.fromString("400"));
-        assertThat(min.getString(2))
-                .isEqualTo(max.getString(2))
-                .isEqualTo(BinaryString.fromString("401"));
-        
assertThat(min.getDouble(3)).isEqualTo(max.getDouble(3)).isEqualTo(402D);
-        assertThat(min.getInt(4)).isEqualTo(max.getInt(4)).isEqualTo(403);
-        assertThat(min.getDecimal(5, 10, 2).toBigDecimal().intValue())
-                .isEqualTo(max.getDecimal(5, 10, 2).toBigDecimal().intValue())
-                .isEqualTo(404);
-        assertThat(min.getFloat(6)).isEqualTo(max.getFloat(6)).isEqualTo(405F);
-        
assertThat(min.getDouble(7)).isEqualTo(max.getDouble(7)).isEqualTo(406D);
-        assertThat(min.getDecimal(8, 10, 2).toBigDecimal().doubleValue())
-                .isEqualTo(max.getDecimal(8, 10, 
2).toBigDecimal().doubleValue())
-                .isEqualTo(407D);
-        assertThat(min.getInt(9)).isEqualTo(max.getInt(9)).isEqualTo(408);
-        assertThat(min.getInt(10)).isEqualTo(max.getInt(10)).isEqualTo(409);
-        assertThat(min.isNullAt(11)).isEqualTo(max.isNullAt(11)).isTrue();
-    }
-
-    /**
-     * Check file with new types and data.
-     *
-     * <ul>
-     *   <li>data1: 
2,"200","201",toDecimal(202),(short)203,204,205L,206F,207D,208,toTimestamp(209 *
-     *       millsPerDay),toBytes("210")
-     *   <li>data2: 
2,"300","301",toDecimal(302),(short)303,304,305L,306F,307D,308,toTimestamp(309 *
-     *       millsPerDay),toBytes("310")
-     *   <li>old types: a->int, b->char[10], c->varchar[10], d->decimal, 
e->smallint, f->int,
-     *       g->bigint, h->float, i->double, j->date, k->timestamp, l->binary
-     *   <li>new types: a->int, b->varchar[10], c->varchar[10], d->double, 
e->int,
-     *       f->decimal,g->float, h->double, i->decimal, j->date, k->date, 
l->varbinary
-     * </ul>
-     */
-    private void checkTwoValues(InternalRow min, InternalRow max) {
-        assertThat(min.getInt(0)).isEqualTo(2);
-        assertThat(max.getInt(0)).isEqualTo(2);
-
-        // parquet does not support padding
-        assertThat(min.getString(1).toString()).startsWith("200");
-        assertThat(max.getString(1).toString()).startsWith("300");
-
-        assertThat(min.getString(2)).isEqualTo(BinaryString.fromString("201"));
-        assertThat(max.getString(2)).isEqualTo(BinaryString.fromString("301"));
-
-        assertThat(min.getDouble(3)).isEqualTo(202D);
-        assertThat(max.getDouble(3)).isEqualTo(302D);
-
-        assertThat(min.getInt(4)).isEqualTo(203);
-        assertThat(max.getInt(4)).isEqualTo(303);
-
-        assertThat(min.getDecimal(5, 10, 
2).toBigDecimal().intValue()).isEqualTo(204);
-        assertThat(max.getDecimal(5, 10, 
2).toBigDecimal().intValue()).isEqualTo(304);
-
-        assertThat(min.getFloat(6)).isEqualTo(205F);
-        assertThat(max.getFloat(6)).isEqualTo(305F);
-
-        assertThat(min.getDouble(7)).isEqualTo(206D);
-        assertThat(max.getDouble(7)).isEqualTo(306D);
-
-        assertThat(min.getDecimal(8, 10, 
2).toBigDecimal().doubleValue()).isEqualTo(207D);
-        assertThat(max.getDecimal(8, 10, 
2).toBigDecimal().doubleValue()).isEqualTo(307D);
-
-        assertThat(min.getInt(9)).isEqualTo(208);
-        assertThat(max.getInt(9)).isEqualTo(308);
-
-        assertThat(min.getInt(10)).isEqualTo(209);
-        assertThat(max.getInt(10)).isEqualTo(309);
-
-        // Min and max value of binary type is null
-        assertThat(min.isNullAt(11)).isTrue();
-        assertThat(max.isNullAt(11)).isTrue();
-    }
-
     @Override
     protected List<String> getPrimaryKeyNames() {
         return SCHEMA_PRIMARY_KEYS;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
index 64bb5f21ab..066bd6c57d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
@@ -65,6 +65,8 @@ public class PrimaryKeyColumnTypeFileDataTest extends 
ColumnTypeFileDataTestBase
                 (files, schemas) -> {
                     FileStoreTable table = createFileStoreTable(schemas);
 
+                    // filter g: old file cannot apply filter, so the new file 
in the same partition
+                    // also cannot be filtered, the result contains all data
                     List<Split> splits =
                             toSplits(
                                     table.newSnapshotReader()
@@ -77,6 +79,9 @@ public class PrimaryKeyColumnTypeFileDataTest extends 
ColumnTypeFileDataTestBase
                     List<InternalRow.FieldGetter> fieldGetterList = 
getFieldGetterList(table);
                     assertThat(getResult(table.newRead(), splits, 
fieldGetterList))
                             .containsExactlyInAnyOrder(
+                                    
"1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110",
+                                    
"1|500|501|502.0|503|504.00|505.0|506.0|507.00|508|509|510",
+                                    
"1|600|601|602.0|603|604.00|605.0|606.0|607.00|608|609|610",
                                     
"2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210",
                                     
"2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310",
                                     
"2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410");
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
index 32a4138be5..359d5e7b9d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
@@ -18,25 +18,19 @@
 
 package org.apache.paimon.table;
 
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.stats.SimpleStatsEvolution;
-import org.apache.paimon.stats.SimpleStatsEvolutions;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.types.DataField;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -105,54 +99,12 @@ public class PrimaryKeyTableColumnTypeFileMetaTest extends 
ColumnTypeFileMetaTes
                                                     .between(6, 200F, 500F))
                                     .read()
                                     .dataSplits();
-                    // filtered and only 3 rows left
-                    checkFilterRowCount(toDataFileMetas(splits), 3L);
+                    // filter g: old file cannot apply filter, so the new file 
in the same partition
+                    // also cannot be filtered, the result contains all data
+                    checkFilterRowCount(toDataFileMetas(splits), 6L);
                 },
                 getPrimaryKeyNames(),
                 tableConfig,
                 this::createFileStoreTable);
     }
-
-    /** We can only validate the values in primary keys for changelog with key 
table. */
-    @Override
-    protected void validateValuesWithNewSchema(
-            Map<Long, TableSchema> tableSchemas,
-            long schemaId,
-            List<String> filesName,
-            List<DataFileMeta> fileMetaList) {
-        Function<Long, List<DataField>> schemaFields =
-                id -> 
tableSchemas.get(id).logicalTrimmedPrimaryKeysType().getFields();
-        SimpleStatsEvolutions converters = new 
SimpleStatsEvolutions(schemaFields, schemaId);
-        for (DataFileMeta fileMeta : fileMetaList) {
-            SimpleStats stats = getTableValueStats(fileMeta);
-            SimpleStatsEvolution.Result result =
-                    
converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null);
-            InternalRow min = result.minValues();
-            InternalRow max = result.maxValues();
-            assertThat(min.getFieldCount()).isEqualTo(4);
-            if (filesName.contains(fileMeta.fileName())) {
-                // parquet does not support padding
-                assertThat(min.getString(0).toString()).startsWith("200");
-                assertThat(max.getString(0).toString()).startsWith("300");
-
-                
assertThat(min.getString(1)).isEqualTo(BinaryString.fromString("201"));
-                
assertThat(max.getString(1)).isEqualTo(BinaryString.fromString("301"));
-
-                assertThat(min.getDouble(2)).isEqualTo(202D);
-                assertThat(max.getDouble(2)).isEqualTo(302D);
-
-                assertThat(min.getInt(3)).isEqualTo(203);
-                assertThat(max.getInt(3)).isEqualTo(303);
-            } else {
-                assertThat(min.getString(0))
-                        .isEqualTo(max.getString(0))
-                        .isEqualTo(BinaryString.fromString("400"));
-                assertThat(min.getString(1))
-                        .isEqualTo(max.getString(1))
-                        .isEqualTo(BinaryString.fromString("401"));
-                
assertThat(min.getDouble(2)).isEqualTo(max.getDouble(2)).isEqualTo(402D);
-                
assertThat(min.getInt(3)).isEqualTo(max.getInt(3)).isEqualTo(403);
-            }
-        }
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 5d20e46ec2..24955dc94a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.DataFormatTestUtil;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -34,6 +35,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -405,6 +407,59 @@ public class SchemaEvolutionTest {
                                         "_VALUE_KIND", SYSTEM_FIELD_NAMES)));
     }
 
+    @Test
+    public void testPushDownEvolutionSafeFilter() throws Exception {
+        Schema schema =
+                new Schema(
+                        RowType.of(DataTypes.INT(), 
DataTypes.BIGINT()).getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonMap("write-only", "true"),
+                        "");
+        schemaManager.createTable(schema);
+
+        FileStoreTable table = 
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                TableCommitImpl commit = table.newCommit(commitUser)) {
+            // file 1
+            write.write(GenericRow.of(1, 1L));
+            commit.commit(1, write.prepareCommit(false, 1));
+
+            // file 2
+            write.write(GenericRow.of(2, 2L));
+            commit.commit(2, write.prepareCommit(false, 2));
+        }
+
+        schemaManager.commitChanges(
+                Collections.singletonList(SchemaChange.updateColumnType("f0", 
DataTypes.STRING())));
+        table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+
+        try (StreamTableWrite write = table.newWrite(commitUser);
+                TableCommitImpl commit = table.newCommit(commitUser)) {
+            // file 3
+            write.write(GenericRow.of(BinaryString.fromString("0"), 3L));
+            commit.commit(3, write.prepareCommit(false, 3));
+
+            // file 4
+            write.write(GenericRow.of(BinaryString.fromString("3"), 3L));
+            commit.commit(4, write.prepareCommit(false, 4));
+        }
+
+        PredicateBuilder builder = new 
PredicateBuilder(table.schema().logicalRowType());
+        // p: f0 >= '3' && f1 >= 2L
+        Predicate p =
+                PredicateBuilder.and(
+                        builder.greaterOrEqual(0, 
BinaryString.fromString("3")),
+                        builder.greaterOrEqual(1, 2L));
+        // file 1 will be filtered by f1 >= 2L
+        // file 2 won't be filtered because f0 >= '3' is not safe
+        // file 3 will be filtered by f0 >= '3'
+        // file 4 won't be filtered
+        List<String> rows = readRecords(table, p);
+        assertThat(rows).containsExactlyInAnyOrder("2, 2", "3, 3");
+    }
+
     private List<String> readRecords(FileStoreTable table, Predicate filter) 
throws IOException {
         List<String> results = new ArrayList<>();
         forEachRemaining(
@@ -423,7 +478,9 @@ public class SchemaEvolutionTest {
         if (filter != null) {
             snapshotReader.withFilter(filter);
         }
-        for (Split split : snapshotReader.read().dataSplits()) {
+        List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
+
+        for (Split split : dataSplits) {
             InnerTableRead read = table.newRead();
             if (filter != null) {
                 read.withFilter(filter);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
index 3b12ceabe2..73ea8f0ae1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
@@ -230,4 +230,30 @@ public class FilterPushdownWithSchemaChangeITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM T WHERE f = 
1")).containsExactly(Row.of(1, 1));
         assertThat(sql("SELECT * FROM T WHERE f <> 
1")).containsExactly(Row.of(2, 111));
     }
+
+    @TestTemplate
+    public void testAppendStringToNumericForStatsFilter() {
+        sql("CREATE TABLE T (a STRING)");
+        sql("INSERT INTO T VALUES ('9'), ('10'), ('11')");
+        sql("ALTER TABLE T MODIFY (a INT)");
+
+        assertThat(sql("SELECT * FROM T WHERE a > 9"))
+                .containsExactlyInAnyOrder(Row.of(10), Row.of(11));
+    }
+
+    @TestTemplate
+    public void testPrimaryStringToNumericForStatsFilter() {
+        sql("CREATE TABLE T (pk STRING PRIMARY KEY NOT ENFORCED, v STRING)");
+        sql("INSERT INTO T VALUES ('9', '9'), ('10', '10'), ('11', '11')");
+
+        // key filter
+        sql("ALTER TABLE T MODIFY (pk INT)");
+        assertThat(sql("SELECT * FROM T WHERE pk > 9"))
+                .containsExactlyInAnyOrder(Row.of(10, "10"), Row.of(11, "11"));
+
+        // value filter
+        sql("ALTER TABLE T MODIFY (v INT)");
+        assertThat(sql("SELECT * FROM T WHERE v > 9"))
+                .containsExactlyInAnyOrder(Row.of(10, 10), Row.of(11, 11));
+    }
 }


Reply via email to