This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2a70b56982 [core] Fix stats filter by pushdown after schema evolution
(#5825)
2a70b56982 is described below
commit 2a70b56982d41f3abb6e073099f1929c2164d9d9
Author: yuzelin <[email protected]>
AuthorDate: Tue Jul 22 14:11:32 2025 +0800
[core] Fix stats filter by pushdown after schema evolution (#5825)
---
.../paimon/operation/AppendOnlyFileStoreScan.java | 8 +-
.../paimon/operation/KeyValueFileStoreScan.java | 37 ++++--
.../apache/paimon/schema/SchemaEvolutionUtil.java | 46 +++++--
.../apache/paimon/stats/SimpleStatsEvolutions.java | 23 ++--
.../apache/paimon/utils/FormatReaderMapping.java | 4 +-
.../test/java/org/apache/paimon/TestFileStore.java | 9 +-
.../paimon/schema/SchemaEvolutionUtilTest.java | 4 +-
.../paimon/table/ColumnTypeFileDataTestBase.java | 14 +--
.../paimon/table/ColumnTypeFileMetaTestBase.java | 137 +--------------------
.../table/PrimaryKeyColumnTypeFileDataTest.java | 5 +
.../PrimaryKeyTableColumnTypeFileMetaTest.java | 54 +-------
.../apache/paimon/table/SchemaEvolutionTest.java | 59 ++++++++-
.../FilterPushdownWithSchemaChangeITCase.java | 26 ++++
13 files changed, 191 insertions(+), 235 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 9e4bf12f04..0cf3dbdb69 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -80,7 +80,9 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
- if (filter == null) {
+ Predicate safeFilter =
+
simpleStatsEvolutions.toEvolutionSafeStatsFilter(entry.file().schemaId(),
filter);
+ if (safeFilter == null) {
return true;
}
@@ -91,7 +93,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
entry.file().rowCount(),
entry.file().valueStatsCols());
- return filter.test(
+ return safeFilter.test(
entry.file().rowCount(),
stats.minValues(),
stats.maxValues(),
@@ -110,7 +112,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
dataFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id ->
- simpleStatsEvolutions.tryDevolveFilter(
+
simpleStatsEvolutions.toEvolutionSafeStatsFilter(
entry.file().schemaId(), filter));
try (FileIndexPredicate predicate =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 9845b01346..a1b364679b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -85,10 +85,11 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
manifestFileFactory,
scanManifestParallelism);
this.bucketSelectConverter = bucketSelectConverter;
+ // NOTE: don't add key prefix to field names because
fieldKeyStatsConverters is used for
+ // filter conversion
this.fieldKeyStatsConverters =
new SimpleStatsEvolutions(
- sid ->
keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
- schema.id());
+ sid ->
scanTableSchema(sid).trimmedPrimaryKeysFields(), schema.id());
this.fieldValueStatsConverters =
new SimpleStatsEvolutions(
sid ->
keyValueFieldsExtractor.valueFields(scanTableSchema(sid)),
@@ -119,21 +120,24 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
- DataFileMeta file = entry.file();
if (isValueFilterEnabled() && !filterByValueFilter(entry)) {
return false;
}
- if (keyFilter != null) {
- SimpleStatsEvolution.Result stats =
- fieldKeyStatsConverters
- .getOrCreate(file.schemaId())
- .evolution(file.keyStats(), file.rowCount(), null);
- return keyFilter.test(
- file.rowCount(), stats.minValues(), stats.maxValues(),
stats.nullCounts());
+ Predicate safeKeyFilter =
+ fieldKeyStatsConverters.toEvolutionSafeStatsFilter(
+ entry.file().schemaId(), keyFilter);
+ if (safeKeyFilter == null) {
+ return true;
}
- return true;
+ DataFileMeta file = entry.file();
+ SimpleStatsEvolution.Result stats =
+ fieldKeyStatsConverters
+ .getOrCreate(file.schemaId())
+ .evolution(file.keyStats(), file.rowCount(), null);
+ return safeKeyFilter.test(
+ file.rowCount(), stats.minValues(), stats.maxValues(),
stats.nullCounts());
}
@Override
@@ -156,7 +160,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
schemaId2DataFilter.computeIfAbsent(
entry.file().schemaId(),
id ->
- fieldValueStatsConverters.tryDevolveFilter(
+
fieldValueStatsConverters.toEvolutionSafeStatsFilter(
entry.file().schemaId(),
valueFilter));
return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
@@ -225,12 +229,19 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
return ((FilteredManifestEntry) entry).selected();
}
+ Predicate safeValueFilter =
+ fieldValueStatsConverters.toEvolutionSafeStatsFilter(
+ entry.file().schemaId(), valueFilter);
+ if (safeValueFilter == null) {
+ return true;
+ }
+
DataFileMeta file = entry.file();
SimpleStatsEvolution.Result result =
fieldValueStatsConverters
.getOrCreate(file.schemaId())
.evolution(file.valueStats(), file.rowCount(),
file.valueStatsCols());
- return valueFilter.test(
+ return safeValueFilter.test(
file.rowCount(),
result.minValues(),
result.maxValues(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index d30fe19abb..e7e4831386 100644
---
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -131,11 +131,16 @@ public class SchemaEvolutionUtil {
* @param tableFields the table fields
* @param dataFields the underlying data fields
* @param filters the filters
+ * @param forData true if devolve the filters for filtering data file,
otherwise, for filtering
+ * manifest entry
* @return the data filters
*/
@Nullable
- public static List<Predicate> devolveDataFilters(
- List<DataField> tableFields, List<DataField> dataFields,
List<Predicate> filters) {
+ public static List<Predicate> devolveFilters(
+ List<DataField> tableFields,
+ List<DataField> dataFields,
+ List<Predicate> filters,
+ boolean forData) {
if (filters == null) {
return null;
}
@@ -154,19 +159,36 @@ public class SchemaEvolutionUtil {
String.format("Find no field %s",
predicate.fieldName()));
DataField dataField = idToDataFields.get(tableField.id());
if (dataField == null) {
+ // For example, add field b and filter b, the filter
is safe for old file
+ // meta without field b because the index mapping
array can handle null
+ return forData ? Optional.empty() :
Optional.of(predicate);
+ }
+
+ Optional<List<Object>> castedLiterals =
+ CastExecutors.castLiteralsWithEvolution(
+ predicate.literals(), predicate.type(),
dataField.type());
+
+ // unsafe
+ if (!castedLiterals.isPresent()) {
return Optional.empty();
}
- return CastExecutors.castLiteralsWithEvolution(
- predicate.literals(), predicate.type(),
dataField.type())
- .map(
- literals ->
- new LeafPredicate(
- predicate.function(),
- dataField.type(),
- indexOf(dataField,
idToDataFields),
- dataField.name(),
- literals));
+ if (forData) {
+ // For data, the filter will be pushdown to data file,
so must use the index
+ // and literal type of data file
+ return Optional.of(
+ new LeafPredicate(
+ predicate.function(),
+ dataField.type(),
+ indexOf(dataField, idToDataFields),
+ dataField.name(),
+ castedLiterals.get()));
+ } else {
+ // For meta, the index mapping array will map the
index the cast the
+ // literals, so just return self
+ // In other words, return it if it's safe
+ return Optional.of(predicate);
+ }
};
for (Predicate predicate : filters) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
index 566cae9e65..eb44ea706c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
+++
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
@@ -19,6 +19,7 @@
package org.apache.paimon.stats;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.types.DataField;
@@ -26,7 +27,6 @@ import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -77,18 +77,25 @@ public class SimpleStatsEvolutions {
});
}
+ /**
+ * If the file's schema id != current table schema id, convert the filter
to evolution safe
+ * filter or null if can't.
+ */
@Nullable
- public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) {
- if (tableSchemaId == dataSchemaId) {
+ public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable
Predicate filter) {
+ if (filter == null || dataSchemaId == tableSchemaId) {
return filter;
}
+
+ // Filter p1 && p2, if only p1 is safe, we can return only p1 to try
best filter and let the
+ // compute engine to perform p2.
+ List<Predicate> filters = PredicateBuilder.splitAnd(filter);
List<Predicate> devolved =
Objects.requireNonNull(
- SchemaEvolutionUtil.devolveDataFilters(
- schemaFields.apply(tableSchemaId),
- schemaFields.apply(dataSchemaId),
- Collections.singletonList(filter)));
- return devolved.isEmpty() ? null : devolved.get(0);
+ SchemaEvolutionUtil.devolveFilters(
+ tableDataFields,
schemaFields.apply(dataSchemaId), filters, false));
+
+ return devolved.isEmpty() ? null : PredicateBuilder.and(devolved);
}
public List<DataField> tableDataFields() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 8de9bf8508..1c89971742 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -308,8 +308,8 @@ public class FormatReaderMapping {
List<Predicate> dataFilters =
tableSchema.id() == dataSchema.id()
? filters
- : SchemaEvolutionUtil.devolveDataFilters(
- tableSchema.fields(), dataSchema.fields(),
filters);
+ : SchemaEvolutionUtil.devolveFilters(
+ tableSchema.fields(), dataSchema.fields(),
filters, true);
// Skip pushing down partition filters to reader.
return excludePredicateWithFields(
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index efe4c1f62c..7d303cad2d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -53,6 +53,7 @@ import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.ExpireChangelogImpl;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.ExpireSnapshotsImpl;
+import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
@@ -126,7 +127,7 @@ public class TestFileStore extends KeyValueFileStore {
valueType.getFields(),
valueType.getFieldCount(),
partitionType.getFieldNames(),
- keyType.getFieldNames(),
+ cleanPrimaryKeys(keyType.getFieldNames()),
Collections.emptyMap(),
null),
false,
@@ -148,6 +149,12 @@ public class TestFileStore extends KeyValueFileStore {
this.commitIdentifier = 0L;
}
+ private static List<String> cleanPrimaryKeys(List<String> primaryKeys) {
+ return primaryKeys.stream()
+ .map(k -> k.substring(SpecialFields.KEY_FIELD_PREFIX.length()))
+ .collect(Collectors.toList());
+ }
+
private static SchemaManager schemaManager(String root, CoreOptions
options) {
return new SchemaManager(FileIOFinder.find(new Path(root)),
options.path());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
index 291ad0ef4f..2883b36b5c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
@@ -91,8 +91,8 @@ public class SchemaEvolutionUtilTest {
IsNull.INSTANCE, DataTypes.INT(), 7, "a",
Collections.emptyList()));
List<Predicate> filters =
- SchemaEvolutionUtil.devolveDataFilters(tableFields2,
dataFields, predicates);
- assert filters != null;
+ SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields,
predicates, true);
+ assertThat(filters).isNotNull();
assertThat(filters.size()).isEqualTo(1);
LeafPredicate child1 = (LeafPredicate) filters.get(0);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java
index af927377ee..5075343adf 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java
@@ -119,14 +119,8 @@ public abstract class ColumnTypeFileDataTestBase extends
SchemaEvolutionTableTes
(files, schemas) -> {
FileStoreTable table = createFileStoreTable(schemas);
- /**
- * Filter field "g" in [200, 500] in SCHEMA_FIELDS which
is updated from bigint
- * to float and will get another file with one data as
followed:
- *
- * <ul>
- *
<li>2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
- * </ul>
- */
+ // Filter field "g" in [200, 500] in SCHEMA_FIELDS cannot
filter old file 1 and
+ // file 2 because filter is forbidden
List<Split> splits =
toSplits(
table.newSnapshotReader()
@@ -139,8 +133,12 @@ public abstract class ColumnTypeFileDataTestBase extends
SchemaEvolutionTableTes
List<InternalRow.FieldGetter> fieldGetterList =
getFieldGetterList(table);
assertThat(getResult(table.newRead(), splits,
fieldGetterList))
.containsExactlyInAnyOrder(
+ // old file1
+
"1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110",
+ // old file2
"2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210",
"2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310",
+ // normal filtered data
"2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410");
},
getPrimaryKeyNames(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
index 7f264fa817..e92ce9b49b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java
@@ -20,23 +20,16 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.stats.SimpleStatsEvolution;
-import org.apache.paimon.stats.SimpleStatsEvolutions;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.types.DataField;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -137,16 +130,8 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
(files, schemas) -> {
FileStoreTable table = createFileStoreTable(schemas);
- /*
- Filter field "g" in [200, 500] in SCHEMA_FIELDS which is
updated from bigint
- to float and will get another file with one data as
followed:
-
- <ul>
-
<li>2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
- </ul>
-
- <p>Then we can check the results of the two result files.
- */
+ // Filter field "g" in [200, 500], get old file 1,2
(filter is forbidden) and
+ // new file 1
List<DataSplit> splits =
table.newSnapshotReader()
.withFilter(
@@ -154,7 +139,7 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
.between(6, 200F, 500F))
.read()
.dataSplits();
- checkFilterRowCount(toDataFileMetas(splits), 3L);
+ checkFilterRowCount(toDataFileMetas(splits), 4L);
List<String> filesName =
files.stream().map(DataFileMeta::fileName).collect(Collectors.toList());
@@ -169,9 +154,6 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
.map(DataFileMeta::fileName)
.collect(Collectors.toList()))
.containsAll(filesName);
-
- validateValuesWithNewSchema(
- schemas, table.schema().id(), filesName,
fileMetaList);
},
getPrimaryKeyNames(),
tableConfig,
@@ -220,125 +202,12 @@ public abstract class ColumnTypeFileMetaTestBase extends
SchemaEvolutionTableTes
.map(DataFileMeta::fileName)
.collect(Collectors.toList()))
.containsAll(filesName);
-
- // Compare all columns with table column type
- validateValuesWithNewSchema(
- schemas, table.schema().id(), filesName,
fileMetaList);
},
getPrimaryKeyNames(),
tableConfig,
this::createFileStoreTable);
}
- protected void validateValuesWithNewSchema(
- Map<Long, TableSchema> tableSchemas,
- long schemaId,
- List<String> filesName,
- List<DataFileMeta> fileMetaList) {
- Function<Long, List<DataField>> schemaFields = id ->
tableSchemas.get(id).fields();
- SimpleStatsEvolutions converters = new
SimpleStatsEvolutions(schemaFields, schemaId);
- for (DataFileMeta fileMeta : fileMetaList) {
- SimpleStats stats = getTableValueStats(fileMeta);
- SimpleStatsEvolution.Result result =
-
converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null);
- InternalRow min = result.minValues();
- InternalRow max = result.maxValues();
- assertThat(stats.minValues().getFieldCount()).isEqualTo(12);
- if (filesName.contains(fileMeta.fileName())) {
- checkTwoValues(min, max);
- } else {
- checkOneValue(min, max);
- }
- }
- }
-
- /**
- * Check file data with one data.
- *
- * <ul>
- * <li>data:
- *
2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
- * <li>types: a->int, b->varchar[10], c->varchar[10], d->double, e->int,
f->decimal,g->float,
- * h->double, i->decimal, j->date, k->date, l->varbinary
- * </ul>
- */
- private void checkOneValue(InternalRow min, InternalRow max) {
- assertThat(min.getInt(0)).isEqualTo(max.getInt(0)).isEqualTo(2);
- assertThat(min.getString(1))
- .isEqualTo(max.getString(1))
- .isEqualTo(BinaryString.fromString("400"));
- assertThat(min.getString(2))
- .isEqualTo(max.getString(2))
- .isEqualTo(BinaryString.fromString("401"));
-
assertThat(min.getDouble(3)).isEqualTo(max.getDouble(3)).isEqualTo(402D);
- assertThat(min.getInt(4)).isEqualTo(max.getInt(4)).isEqualTo(403);
- assertThat(min.getDecimal(5, 10, 2).toBigDecimal().intValue())
- .isEqualTo(max.getDecimal(5, 10, 2).toBigDecimal().intValue())
- .isEqualTo(404);
- assertThat(min.getFloat(6)).isEqualTo(max.getFloat(6)).isEqualTo(405F);
-
assertThat(min.getDouble(7)).isEqualTo(max.getDouble(7)).isEqualTo(406D);
- assertThat(min.getDecimal(8, 10, 2).toBigDecimal().doubleValue())
- .isEqualTo(max.getDecimal(8, 10,
2).toBigDecimal().doubleValue())
- .isEqualTo(407D);
- assertThat(min.getInt(9)).isEqualTo(max.getInt(9)).isEqualTo(408);
- assertThat(min.getInt(10)).isEqualTo(max.getInt(10)).isEqualTo(409);
- assertThat(min.isNullAt(11)).isEqualTo(max.isNullAt(11)).isTrue();
- }
-
- /**
- * Check file with new types and data.
- *
- * <ul>
- * <li>data1:
2,"200","201",toDecimal(202),(short)203,204,205L,206F,207D,208,toTimestamp(209 *
- * millsPerDay),toBytes("210")
- * <li>data2:
2,"300","301",toDecimal(302),(short)303,304,305L,306F,307D,308,toTimestamp(309 *
- * millsPerDay),toBytes("310")
- * <li>old types: a->int, b->char[10], c->varchar[10], d->decimal,
e->smallint, f->int,
- * g->bigint, h->float, i->double, j->date, k->timestamp, l->binary
- * <li>new types: a->int, b->varchar[10], c->varchar[10], d->double,
e->int,
- * f->decimal,g->float, h->double, i->decimal, j->date, k->date,
l->varbinary
- * </ul>
- */
- private void checkTwoValues(InternalRow min, InternalRow max) {
- assertThat(min.getInt(0)).isEqualTo(2);
- assertThat(max.getInt(0)).isEqualTo(2);
-
- // parquet does not support padding
- assertThat(min.getString(1).toString()).startsWith("200");
- assertThat(max.getString(1).toString()).startsWith("300");
-
- assertThat(min.getString(2)).isEqualTo(BinaryString.fromString("201"));
- assertThat(max.getString(2)).isEqualTo(BinaryString.fromString("301"));
-
- assertThat(min.getDouble(3)).isEqualTo(202D);
- assertThat(max.getDouble(3)).isEqualTo(302D);
-
- assertThat(min.getInt(4)).isEqualTo(203);
- assertThat(max.getInt(4)).isEqualTo(303);
-
- assertThat(min.getDecimal(5, 10,
2).toBigDecimal().intValue()).isEqualTo(204);
- assertThat(max.getDecimal(5, 10,
2).toBigDecimal().intValue()).isEqualTo(304);
-
- assertThat(min.getFloat(6)).isEqualTo(205F);
- assertThat(max.getFloat(6)).isEqualTo(305F);
-
- assertThat(min.getDouble(7)).isEqualTo(206D);
- assertThat(max.getDouble(7)).isEqualTo(306D);
-
- assertThat(min.getDecimal(8, 10,
2).toBigDecimal().doubleValue()).isEqualTo(207D);
- assertThat(max.getDecimal(8, 10,
2).toBigDecimal().doubleValue()).isEqualTo(307D);
-
- assertThat(min.getInt(9)).isEqualTo(208);
- assertThat(max.getInt(9)).isEqualTo(308);
-
- assertThat(min.getInt(10)).isEqualTo(209);
- assertThat(max.getInt(10)).isEqualTo(309);
-
- // Min and max value of binary type is null
- assertThat(min.isNullAt(11)).isTrue();
- assertThat(max.isNullAt(11)).isTrue();
- }
-
@Override
protected List<String> getPrimaryKeyNames() {
return SCHEMA_PRIMARY_KEYS;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
index 64bb5f21ab..066bd6c57d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
@@ -65,6 +65,8 @@ public class PrimaryKeyColumnTypeFileDataTest extends
ColumnTypeFileDataTestBase
(files, schemas) -> {
FileStoreTable table = createFileStoreTable(schemas);
+ // filter g: old file cannot apply filter, so the new file
in the same partition
+ // also cannot be filtered, the result contains all data
List<Split> splits =
toSplits(
table.newSnapshotReader()
@@ -77,6 +79,9 @@ public class PrimaryKeyColumnTypeFileDataTest extends
ColumnTypeFileDataTestBase
List<InternalRow.FieldGetter> fieldGetterList =
getFieldGetterList(table);
assertThat(getResult(table.newRead(), splits,
fieldGetterList))
.containsExactlyInAnyOrder(
+
"1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110",
+
"1|500|501|502.0|503|504.00|505.0|506.0|507.00|508|509|510",
+
"1|600|601|602.0|603|604.00|605.0|606.0|607.00|608|609|610",
"2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210",
"2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310",
"2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410");
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
index 32a4138be5..359d5e7b9d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
@@ -18,25 +18,19 @@
package org.apache.paimon.table;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
-import org.apache.paimon.stats.SimpleStatsEvolution;
-import org.apache.paimon.stats.SimpleStatsEvolutions;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.types.DataField;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
-import java.util.function.Function;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -105,54 +99,12 @@ public class PrimaryKeyTableColumnTypeFileMetaTest extends
ColumnTypeFileMetaTes
.between(6, 200F, 500F))
.read()
.dataSplits();
- // filtered and only 3 rows left
- checkFilterRowCount(toDataFileMetas(splits), 3L);
+ // filter g: old file cannot apply filter, so the new file
in the same partition
+ // also cannot be filtered, the result contains all data
+ checkFilterRowCount(toDataFileMetas(splits), 6L);
},
getPrimaryKeyNames(),
tableConfig,
this::createFileStoreTable);
}
-
- /** We can only validate the values in primary keys for changelog with key
table. */
- @Override
- protected void validateValuesWithNewSchema(
- Map<Long, TableSchema> tableSchemas,
- long schemaId,
- List<String> filesName,
- List<DataFileMeta> fileMetaList) {
- Function<Long, List<DataField>> schemaFields =
- id ->
tableSchemas.get(id).logicalTrimmedPrimaryKeysType().getFields();
- SimpleStatsEvolutions converters = new
SimpleStatsEvolutions(schemaFields, schemaId);
- for (DataFileMeta fileMeta : fileMetaList) {
- SimpleStats stats = getTableValueStats(fileMeta);
- SimpleStatsEvolution.Result result =
-
converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null);
- InternalRow min = result.minValues();
- InternalRow max = result.maxValues();
- assertThat(min.getFieldCount()).isEqualTo(4);
- if (filesName.contains(fileMeta.fileName())) {
- // parquet does not support padding
- assertThat(min.getString(0).toString()).startsWith("200");
- assertThat(max.getString(0).toString()).startsWith("300");
-
-
assertThat(min.getString(1)).isEqualTo(BinaryString.fromString("201"));
-
assertThat(max.getString(1)).isEqualTo(BinaryString.fromString("301"));
-
- assertThat(min.getDouble(2)).isEqualTo(202D);
- assertThat(max.getDouble(2)).isEqualTo(302D);
-
- assertThat(min.getInt(3)).isEqualTo(203);
- assertThat(max.getInt(3)).isEqualTo(303);
- } else {
- assertThat(min.getString(0))
- .isEqualTo(max.getString(0))
- .isEqualTo(BinaryString.fromString("400"));
- assertThat(min.getString(1))
- .isEqualTo(max.getString(1))
- .isEqualTo(BinaryString.fromString("401"));
-
assertThat(min.getDouble(2)).isEqualTo(max.getDouble(2)).isEqualTo(402D);
-
assertThat(min.getInt(3)).isEqualTo(max.getInt(3)).isEqualTo(403);
- }
- }
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 5d20e46ec2..24955dc94a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -34,6 +35,7 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -405,6 +407,59 @@ public class SchemaEvolutionTest {
"_VALUE_KIND", SYSTEM_FIELD_NAMES)));
}
+ @Test
+ public void testPushDownEvolutionSafeFilter() throws Exception {
+ Schema schema =
+ new Schema(
+ RowType.of(DataTypes.INT(),
DataTypes.BIGINT()).getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonMap("write-only", "true"),
+ "");
+ schemaManager.createTable(schema);
+
+ FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser)) {
+ // file 1
+ write.write(GenericRow.of(1, 1L));
+ commit.commit(1, write.prepareCommit(false, 1));
+
+ // file 2
+ write.write(GenericRow.of(2, 2L));
+ commit.commit(2, write.prepareCommit(false, 2));
+ }
+
+ schemaManager.commitChanges(
+ Collections.singletonList(SchemaChange.updateColumnType("f0",
DataTypes.STRING())));
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath);
+
+ try (StreamTableWrite write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser)) {
+ // file 3
+ write.write(GenericRow.of(BinaryString.fromString("0"), 3L));
+ commit.commit(3, write.prepareCommit(false, 3));
+
+ // file 4
+ write.write(GenericRow.of(BinaryString.fromString("3"), 3L));
+ commit.commit(4, write.prepareCommit(false, 4));
+ }
+
+ PredicateBuilder builder = new
PredicateBuilder(table.schema().logicalRowType());
+ // p: f0 >= '3' && f1 >= 2L
+ Predicate p =
+ PredicateBuilder.and(
+ builder.greaterOrEqual(0,
BinaryString.fromString("3")),
+ builder.greaterOrEqual(1, 2L));
+ // file 1 will be filtered by f1 >= 2L
+ // file 2 won't be filtered because f0 >= '3' is not safe
+ // file 3 will be filtered by f0 >= '3'
+ // file 4 won't be filtered
+ List<String> rows = readRecords(table, p);
+ assertThat(rows).containsExactlyInAnyOrder("2, 2", "3, 3");
+ }
+
private List<String> readRecords(FileStoreTable table, Predicate filter)
throws IOException {
List<String> results = new ArrayList<>();
forEachRemaining(
@@ -423,7 +478,9 @@ public class SchemaEvolutionTest {
if (filter != null) {
snapshotReader.withFilter(filter);
}
- for (Split split : snapshotReader.read().dataSplits()) {
+ List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
+
+ for (Split split : dataSplits) {
InnerTableRead read = table.newRead();
if (filter != null) {
read.withFilter(filter);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
index 3b12ceabe2..73ea8f0ae1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
@@ -230,4 +230,30 @@ public class FilterPushdownWithSchemaChangeITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT * FROM T WHERE f =
1")).containsExactly(Row.of(1, 1));
assertThat(sql("SELECT * FROM T WHERE f <>
1")).containsExactly(Row.of(2, 111));
}
+
+ @TestTemplate
+ public void testAppendStringToNumericForStatsFilter() {
+ sql("CREATE TABLE T (a STRING)");
+ sql("INSERT INTO T VALUES ('9'), ('10'), ('11')");
+ sql("ALTER TABLE T MODIFY (a INT)");
+
+ assertThat(sql("SELECT * FROM T WHERE a > 9"))
+ .containsExactlyInAnyOrder(Row.of(10), Row.of(11));
+ }
+
+ @TestTemplate
+ public void testPrimaryStringToNumericForStatsFilter() {
+ sql("CREATE TABLE T (pk STRING PRIMARY KEY NOT ENFORCED, v STRING)");
+ sql("INSERT INTO T VALUES ('9', '9'), ('10', '10'), ('11', '11')");
+
+ // key filter
+ sql("ALTER TABLE T MODIFY (pk INT)");
+ assertThat(sql("SELECT * FROM T WHERE pk > 9"))
+ .containsExactlyInAnyOrder(Row.of(10, "10"), Row.of(11, "11"));
+
+ // value filter
+ sql("ALTER TABLE T MODIFY (v INT)");
+ assertThat(sql("SELECT * FROM T WHERE v > 9"))
+ .containsExactlyInAnyOrder(Row.of(10, 10), Row.of(11, 11));
+ }
}