This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new af0d4162e8 [core] Refactory evolved and not evolved filters in file
store scan (#5934)
af0d4162e8 is described below
commit af0d4162e87038baaa08fd9e76d87a8fcebb59e4
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 22 17:53:22 2025 +0800
[core] Refactory evolved and not evolved filters in file store scan (#5934)
---
.../paimon/operation/AppendOnlyFileStoreScan.java | 46 ++++++++++++++------
.../paimon/operation/KeyValueFileStoreScan.java | 44 +++++++++++++------
.../apache/paimon/schema/SchemaEvolutionUtil.java | 49 +++++++---------------
.../apache/paimon/stats/SimpleStatsEvolutions.java | 38 ++++++++++++++---
.../apache/paimon/utils/FormatReaderMapping.java | 2 +-
.../paimon/schema/SchemaEvolutionUtilTest.java | 2 +-
6 files changed, 114 insertions(+), 67 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 0cf3dbdb69..21d9d3880d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -44,10 +44,13 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
private final boolean fileIndexReadEnabled;
- private Predicate filter;
+ private Predicate inputFilter;
- // just cache.
- private final Map<Long, Predicate> dataFilterMapping = new
ConcurrentHashMap<>();
+ // cache not evolved filter by schema id
+ private final Map<Long, Predicate> notEvolvedFilterMapping = new
ConcurrentHashMap<>();
+
+ // cache evolved filter by schema id
+ private final Map<Long, Predicate> evolvedFilterMapping = new
ConcurrentHashMap<>();
public AppendOnlyFileStoreScan(
ManifestsReader manifestsReader,
@@ -72,7 +75,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
}
public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
- this.filter = predicate;
+ this.inputFilter = predicate;
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
return this;
}
@@ -80,9 +83,15 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
- Predicate safeFilter =
-
simpleStatsEvolutions.toEvolutionSafeStatsFilter(entry.file().schemaId(),
filter);
- if (safeFilter == null) {
+ Predicate notEvolvedFilter =
+ notEvolvedFilterMapping.computeIfAbsent(
+ entry.file().schemaId(),
+ id ->
+ // keepNewFieldFilter to handle add field
+ // for example, add field 'c', 'c > 3': old
files can be filtered
+ simpleStatsEvolutions.filterUnsafeFilter(
+ entry.file().schemaId(), inputFilter,
true));
+ if (notEvolvedFilter == null) {
return true;
}
@@ -93,12 +102,23 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
entry.file().rowCount(),
entry.file().valueStatsCols());
- return safeFilter.test(
+ // filter by min max
+ boolean result =
+ notEvolvedFilter.test(
entry.file().rowCount(),
stats.minValues(),
stats.maxValues(),
- stats.nullCounts())
- && (!fileIndexReadEnabled ||
testFileIndex(entry.file().embeddedIndex(), entry));
+ stats.nullCounts());
+
+ if (!result) {
+ return false;
+ }
+
+ if (!fileIndexReadEnabled) {
+ return true;
+ }
+
+ return testFileIndex(entry.file().embeddedIndex(), entry);
}
private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes,
ManifestEntry entry) {
@@ -109,11 +129,11 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
RowType dataRowType =
scanTableSchema(entry.file().schemaId()).logicalRowType();
Predicate dataPredicate =
- dataFilterMapping.computeIfAbsent(
+ evolvedFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id ->
-
simpleStatsEvolutions.toEvolutionSafeStatsFilter(
- entry.file().schemaId(), filter));
+ simpleStatsEvolutions.tryDevolveFilter(
+ entry.file().schemaId(), inputFilter));
try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index a1b364679b..1de22d4997 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -61,9 +61,17 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
private Predicate keyFilter;
private Predicate valueFilter;
- private final Map<Long, Predicate> schemaId2DataFilter = new
ConcurrentHashMap<>();
private boolean valueFilterForceEnabled = false;
+ // cache not evolved filter by schema id
+ private final Map<Long, Predicate> notEvolvedKeyFilterMapping = new
ConcurrentHashMap<>();
+
+ // cache not evolved filter by schema id
+ private final Map<Long, Predicate> notEvolvedValueFilterMapping = new
ConcurrentHashMap<>();
+
+ // cache evolved filter by schema id
+ private final Map<Long, Predicate> evolvedValueFilterMapping = new
ConcurrentHashMap<>();
+
public KeyValueFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
@@ -124,10 +132,15 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
return false;
}
- Predicate safeKeyFilter =
- fieldKeyStatsConverters.toEvolutionSafeStatsFilter(
- entry.file().schemaId(), keyFilter);
- if (safeKeyFilter == null) {
+ Predicate notEvolvedFilter =
+ notEvolvedKeyFilterMapping.computeIfAbsent(
+ entry.file().schemaId(),
+ id ->
+ // keepNewFieldFilter to handle add field
+ // for example, add field 'c', 'c > 3': old
files can be filtered
+ fieldKeyStatsConverters.filterUnsafeFilter(
+ entry.file().schemaId(), keyFilter,
true));
+ if (notEvolvedFilter == null) {
return true;
}
@@ -136,7 +149,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
fieldKeyStatsConverters
.getOrCreate(file.schemaId())
.evolution(file.keyStats(), file.rowCount(), null);
- return safeKeyFilter.test(
+ return notEvolvedFilter.test(
file.rowCount(), stats.minValues(), stats.maxValues(),
stats.nullCounts());
}
@@ -157,10 +170,10 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
Predicate dataPredicate =
- schemaId2DataFilter.computeIfAbsent(
+ evolvedValueFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id ->
-
fieldValueStatsConverters.toEvolutionSafeStatsFilter(
+ fieldValueStatsConverters.tryDevolveFilter(
entry.file().schemaId(),
valueFilter));
return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
@@ -229,10 +242,15 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
return ((FilteredManifestEntry) entry).selected();
}
- Predicate safeValueFilter =
- fieldValueStatsConverters.toEvolutionSafeStatsFilter(
- entry.file().schemaId(), valueFilter);
- if (safeValueFilter == null) {
+ Predicate notEvolvedFilter =
+ notEvolvedValueFilterMapping.computeIfAbsent(
+ entry.file().schemaId(),
+ id ->
+ // keepNewFieldFilter to handle add field
+ // for example, add field 'c', 'c > 3': old
files can be filtered
+ fieldValueStatsConverters.filterUnsafeFilter(
+ entry.file().schemaId(), valueFilter,
true));
+ if (notEvolvedFilter == null) {
return true;
}
@@ -241,7 +259,7 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
fieldValueStatsConverters
.getOrCreate(file.schemaId())
.evolution(file.valueStats(), file.rowCount(),
file.valueStatsCols());
- return safeValueFilter.test(
+ return notEvolvedFilter.test(
file.rowCount(),
result.minValues(),
result.maxValues(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index e7e4831386..a9075fb6f2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -42,6 +42,7 @@ import org.apache.paimon.utils.ProjectedRow;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -131,18 +132,17 @@ public class SchemaEvolutionUtil {
* @param tableFields the table fields
* @param dataFields the underlying data fields
* @param filters the filters
- * @param forData true if devolve the filters for filtering data file,
otherwise, for filtering
- * manifest entry
+ * @param keepNewFieldFilter true if keep new field filter, the new field
filter needs to be
+ * properly handled
* @return the data filters
*/
- @Nullable
public static List<Predicate> devolveFilters(
List<DataField> tableFields,
List<DataField> dataFields,
List<Predicate> filters,
- boolean forData) {
+ boolean keepNewFieldFilter) {
if (filters == null) {
- return null;
+ return Collections.emptyList();
}
Map<String, DataField> nameToTableFields =
@@ -159,36 +159,19 @@ public class SchemaEvolutionUtil {
String.format("Find no field %s",
predicate.fieldName()));
DataField dataField = idToDataFields.get(tableField.id());
if (dataField == null) {
- // For example, add field b and filter b, the filter
is safe for old file
- // meta without field b because the index mapping
array can handle null
- return forData ? Optional.empty() :
Optional.of(predicate);
+ return keepNewFieldFilter ? Optional.of(predicate) :
Optional.empty();
}
- Optional<List<Object>> castedLiterals =
- CastExecutors.castLiteralsWithEvolution(
- predicate.literals(), predicate.type(),
dataField.type());
-
- // unsafe
- if (!castedLiterals.isPresent()) {
- return Optional.empty();
- }
-
- if (forData) {
- // For data, the filter will be pushdown to data file,
so must use the index
- // and literal type of data file
- return Optional.of(
- new LeafPredicate(
- predicate.function(),
- dataField.type(),
- indexOf(dataField, idToDataFields),
- dataField.name(),
- castedLiterals.get()));
- } else {
- // For meta, the index mapping array will map the
index the cast the
- // literals, so just return self
- // In other words, return it if it's safe
- return Optional.of(predicate);
- }
+ return CastExecutors.castLiteralsWithEvolution(
+ predicate.literals(), predicate.type(),
dataField.type())
+ .map(
+ literals ->
+ new LeafPredicate(
+ predicate.function(),
+ dataField.type(),
+ indexOf(dataField,
idToDataFields),
+ dataField.name(),
+ literals));
};
for (Predicate predicate : filters) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
index eb44ea706c..682728f9a0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
+++
b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java
@@ -21,20 +21,21 @@ package org.apache.paimon.stats;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.IndexCastMapping;
-import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import static java.util.Collections.singletonList;
import static
org.apache.paimon.schema.SchemaEvolutionUtil.createIndexCastMapping;
+import static org.apache.paimon.schema.SchemaEvolutionUtil.devolveFilters;
/** Converters to create col stats array serializer. */
public class SimpleStatsEvolutions {
@@ -82,7 +83,7 @@ public class SimpleStatsEvolutions {
* filter or null if can't.
*/
@Nullable
- public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable
Predicate filter) {
+ public Predicate tryDevolveFilter(long dataSchemaId, @Nullable Predicate
filter) {
if (filter == null || dataSchemaId == tableSchemaId) {
return filter;
}
@@ -91,13 +92,38 @@ public class SimpleStatsEvolutions {
// compute engine to perform p2.
List<Predicate> filters = PredicateBuilder.splitAnd(filter);
List<Predicate> devolved =
- Objects.requireNonNull(
- SchemaEvolutionUtil.devolveFilters(
- tableDataFields,
schemaFields.apply(dataSchemaId), filters, false));
+ devolveFilters(tableDataFields,
schemaFields.apply(dataSchemaId), filters, false);
return devolved.isEmpty() ? null : PredicateBuilder.and(devolved);
}
+ /**
+ * Filter unsafe filter, for example, filter is 'a > 9', old type is
String, new type is Int, if
+ * records are 9, 10 and 11, the evolved filter is not safe.
+ */
+ @Nullable
+ public Predicate filterUnsafeFilter(
+ long dataSchemaId, @Nullable Predicate filter, boolean
keepNewFieldFilter) {
+ if (filter == null || dataSchemaId == tableSchemaId) {
+ return filter;
+ }
+
+ List<Predicate> filters = PredicateBuilder.splitAnd(filter);
+ List<DataField> oldSchema = schemaFields.apply(dataSchemaId);
+ List<Predicate> result = new ArrayList<>();
+ for (Predicate predicate : filters) {
+ if (!devolveFilters(
+ tableDataFields,
+ oldSchema,
+ singletonList(predicate),
+ keepNewFieldFilter)
+ .isEmpty()) {
+ result.add(predicate);
+ }
+ }
+ return result.isEmpty() ? null : PredicateBuilder.and(result);
+ }
+
public List<DataField> tableDataFields() {
return tableDataFields;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 1c89971742..341199d1bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -309,7 +309,7 @@ public class FormatReaderMapping {
tableSchema.id() == dataSchema.id()
? filters
: SchemaEvolutionUtil.devolveFilters(
- tableSchema.fields(), dataSchema.fields(),
filters, true);
+ tableSchema.fields(), dataSchema.fields(),
filters, false);
// Skip pushing down partition filters to reader.
return excludePredicateWithFields(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
index 2883b36b5c..9466e69db3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java
@@ -91,7 +91,7 @@ public class SchemaEvolutionUtilTest {
IsNull.INSTANCE, DataTypes.INT(), 7, "a",
Collections.emptyList()));
List<Predicate> filters =
- SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields,
predicates, true);
+ SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields,
predicates, false);
assertThat(filters).isNotNull();
assertThat(filters.size()).isEqualTo(1);