This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6298f04bb9 [core] Enable manifest filter in data evolution table
(#6455)
6298f04bb9 is described below
commit 6298f04bb95788b93db15604369e6ee8c2fe0f8b
Author: YeJunHao <[email protected]>
AuthorDate: Sun Oct 26 13:58:45 2025 +0800
[core] Enable manifest filter in data evolution table (#6455)
---
.../apache/paimon/reader/DataEvolutionArray.java | 228 +++++++++++++++++++++
.../org/apache/paimon/reader/DataEvolutionRow.java | 13 ++
.../paimon/operation/AbstractFileStoreScan.java | 6 +-
.../paimon/operation/AppendOnlyFileStoreScan.java | 8 +-
.../operation/DataEvolutionFileStoreScan.java | 117 +++++++++++
.../paimon/operation/KeyValueFileStoreScan.java | 5 +
.../table/source/DataEvolutionSplitGenerator.java | 71 +++++--
.../paimon/table/DataEvolutionTableTest.java | 8 +
8 files changed, 433 insertions(+), 23 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
new file mode 100644
index 0000000000..7a14a3ef9c
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.reader;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.variant.Variant;
+
+/** The array which is made up by several rows. */
+public class DataEvolutionArray implements InternalArray {
+
+ private final InternalArray[] rows;
+ private final int[] rowOffsets;
+ private final int[] fieldOffsets;
+
+ public DataEvolutionArray(int rowNumber, int[] rowOffsets, int[]
fieldOffsets) {
+ this.rows = new InternalArray[rowNumber];
+ this.rowOffsets = rowOffsets;
+ this.fieldOffsets = fieldOffsets;
+ }
+
+ public void setRow(int pos, InternalArray row) {
+ if (pos >= rows.length) {
+ throw new IndexOutOfBoundsException(
+ "Position " + pos + " is out of bounds for rows size " +
rows.length);
+ } else {
+ rows[pos] = row;
+ }
+ }
+
+ public void setRows(InternalArray[] rows) {
+ if (rows.length != this.rows.length) {
+ throw new IllegalArgumentException(
+ "The length of input rows "
+ + rows.length
+ + " is not equal to the expected length "
+ + this.rows.length);
+ }
+ for (int i = 0; i < rows.length; i++) {
+ setRow(i, rows[i]);
+ }
+ }
+
+ private InternalArray chooseArray(int pos) {
+ return rows[(rowOffsets[pos])];
+ }
+
+ private int offsetInRow(int pos) {
+ return fieldOffsets[pos];
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ if (rowOffsets[pos] == -1) {
+ return true;
+ }
+ return chooseArray(pos).isNullAt(offsetInRow(pos));
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ return chooseArray(pos).getBoolean(offsetInRow(pos));
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ return chooseArray(pos).getByte(offsetInRow(pos));
+ }
+
+ @Override
+ public short getShort(int pos) {
+ return chooseArray(pos).getShort(offsetInRow(pos));
+ }
+
+ @Override
+ public int getInt(int pos) {
+ return chooseArray(pos).getInt(offsetInRow(pos));
+ }
+
+ @Override
+ public long getLong(int pos) {
+ return chooseArray(pos).getLong(offsetInRow(pos));
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ return chooseArray(pos).getFloat(offsetInRow(pos));
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ return chooseArray(pos).getDouble(offsetInRow(pos));
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ return chooseArray(pos).getString(offsetInRow(pos));
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ return chooseArray(pos).getDecimal(offsetInRow(pos), precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ return chooseArray(pos).getTimestamp(offsetInRow(pos), precision);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ return chooseArray(pos).getBinary(offsetInRow(pos));
+ }
+
+ @Override
+ public Variant getVariant(int pos) {
+ return chooseArray(pos).getVariant(offsetInRow(pos));
+ }
+
+ @Override
+ public Blob getBlob(int pos) {
+ return chooseArray(pos).getBlob(offsetInRow(pos));
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ return chooseArray(pos).getArray(offsetInRow(pos));
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ return chooseArray(pos).getMap(offsetInRow(pos));
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ return chooseArray(pos).getRow(offsetInRow(pos), numFields);
+ }
+
+ @Override
+ public int size() {
+ return rowOffsets.length;
+ }
+
+ @Override
+ public boolean[] toBooleanArray() {
+ boolean[] result = new boolean[rowOffsets.length];
+ for (int i = 0; i < rowOffsets.length; i++) {
+ result[i] = getBoolean(i);
+ }
+ return result;
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ byte[] result = new byte[rowOffsets.length];
+ for (int i = 0; i < rowOffsets.length; i++) {
+ result[i] = getByte(i);
+ }
+ return result;
+ }
+
+ @Override
+ public short[] toShortArray() {
+ short[] result = new short[rowOffsets.length];
+ for (int i = 0; i < rowOffsets.length; i++) {
+ result[i] = getShort(i);
+ }
+ return result;
+ }
+
+ @Override
+ public int[] toIntArray() {
+ int[] result = new int[rowOffsets.length];
+ for (int i = 0; i < rowOffsets.length; i++) {
+ result[i] = getInt(i);
+ }
+ return result;
+ }
+
+ @Override
+ public long[] toLongArray() {
+ long[] result = new long[rowOffsets.length];
+ for (int i = 0; i < rowOffsets.length; i++) {
+ result[i] = getLong(i);
+ }
+ return result;
+ }
+
+ @Override
+ public float[] toFloatArray() {
+ float[] result = new float[rowOffsets.length];
+ for (int i = 0; i < rowOffsets.length; i++) {
+ result[i] = getFloat(i);
+ }
+ return result;
+ }
+
+ @Override
+ public double[] toDoubleArray() {
+ double[] result = new double[rowOffsets.length];
+ for (int i = 0; i < rowOffsets.length; i++) {
+ result[i] = getDouble(i);
+ }
+ return result;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
index ce65026390..824a0c438d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
@@ -58,6 +58,19 @@ public class DataEvolutionRow implements InternalRow {
}
}
+ public void setRows(InternalRow[] rows) {
+ if (rows.length != this.rows.length) {
+ throw new IllegalArgumentException(
+ "The length of input rows "
+ + rows.length
+ + " is not equal to the expected length "
+ + this.rows.length);
+ }
+ for (int i = 0; i < rows.length; i++) {
+ setRow(i, rows[i]);
+ }
+ }
+
private InternalRow chooseRow(int pos) {
return rows[(rowOffsets[pos])];
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index b21c77db1d..0e3f39715d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -74,7 +74,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private final ConcurrentMap<Long, TableSchema> tableSchemas;
private final SchemaManager schemaManager;
- private final TableSchema schema;
+ protected final TableSchema schema;
private Snapshot specifiedSnapshot = null;
private boolean onlyReadRealBuckets = false;
@@ -255,6 +255,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
files.add(iterator.next());
}
+ files = postFilter(files);
+
if (wholeBucketFilterEnabled()) {
// We group files by bucket here, and filter them by the whole
bucket filter.
// Why do this: because in primary key table, we can't just filter
the value
@@ -422,6 +424,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
/** Note: Keep this thread-safe. */
protected abstract boolean filterByStats(ManifestEntry entry);
+ protected abstract List<ManifestEntry> postFilter(List<ManifestEntry>
entries);
+
protected boolean wholeBucketFilterEnabled() {
return false;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 21d9d3880d..be5c48539d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -33,6 +33,7 @@ import org.apache.paimon.utils.SnapshotManager;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,7 +45,7 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
private final boolean fileIndexReadEnabled;
- private Predicate inputFilter;
+ protected Predicate inputFilter;
// cache not evolved filter by schema id
private final Map<Long, Predicate> notEvolvedFilterMapping = new
ConcurrentHashMap<>();
@@ -121,6 +122,11 @@ public class AppendOnlyFileStoreScan extends
AbstractFileStoreScan {
return testFileIndex(entry.file().embeddedIndex(), entry);
}
+ @Override
+ protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+ return entries;
+ }
+
private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes,
ManifestEntry entry) {
if (embeddedIndexBytes == null) {
return true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index fad67e3183..03f36d2ea0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -18,16 +18,33 @@
package org.apache.paimon.operation;
+import org.apache.paimon.data.BinaryArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.DataEvolutionArray;
+import org.apache.paimon.reader.DataEvolutionRow;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsEvolution;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.SnapshotManager;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
/** {@link FileStoreScan} for data-evolution enabled table. */
public class DataEvolutionFileStoreScan extends AppendOnlyFileStoreScan {
+ private boolean dropStats = false;
+
public DataEvolutionFileStoreScan(
ManifestsReader manifestsReader,
BucketSelectConverter bucketSelectConverter,
@@ -47,10 +64,110 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
false);
}
+ @Override
+ public FileStoreScan dropStats() {
+ this.dropStats = true;
+ return this;
+ }
+
+ @Override
+ public FileStoreScan keepStats() {
+ this.dropStats = false;
+ return this;
+ }
+
public DataEvolutionFileStoreScan withFilter(Predicate predicate) {
+ this.inputFilter = predicate;
return this;
}
+ @Override
+ protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+ if (inputFilter == null) {
+ return entries;
+ }
+ List<List<ManifestEntry>> splitByRowId =
+ DataEvolutionSplitGenerator.splitManifests(entries);
+
+ return splitByRowId.stream()
+ .filter(this::filterByStats)
+ .flatMap(Collection::stream)
+ .map(entry -> dropStats ? dropStats(entry) : entry)
+ .collect(Collectors.toList());
+ }
+
+ private boolean filterByStats(List<ManifestEntry> metas) {
+ long rowCount = metas.get(0).file().rowCount();
+ SimpleStatsEvolution.Result evolutionResult = evolutionStats(metas);
+ return inputFilter.test(
+ rowCount,
+ evolutionResult.minValues(),
+ evolutionResult.maxValues(),
+ evolutionResult.nullCounts());
+ }
+
+ private SimpleStatsEvolution.Result evolutionStats(List<ManifestEntry>
metas) {
+ int[] allFields =
schema.fields().stream().mapToInt(DataField::id).toArray();
+ int fieldsCount = schema.fields().size();
+ int[] rowOffsets = new int[fieldsCount];
+ int[] fieldOffsets = new int[fieldsCount];
+ Arrays.fill(rowOffsets, -1);
+ Arrays.fill(fieldOffsets, -1);
+
+ InternalRow[] min = new InternalRow[metas.size()];
+ InternalRow[] max = new InternalRow[metas.size()];
+ BinaryArray[] nullCounts = new BinaryArray[metas.size()];
+
+ for (int i = 0; i < metas.size(); i++) {
+ SimpleStats stats = metas.get(i).file().valueStats();
+ min[i] = stats.minValues();
+ max[i] = stats.maxValues();
+ nullCounts[i] = stats.nullCounts();
+ }
+
+ for (int i = 0; i < metas.size(); i++) {
+ DataFileMeta fileMeta = metas.get(i).file();
+ TableSchema dataFileSchema =
+ scanTableSchema(fileMeta.schemaId())
+ .project(
+ fileMeta.valueStatsCols() == null
+ ? fileMeta.writeCols()
+ : fileMeta.valueStatsCols());
+ int[] fieldIds =
+
SpecialFields.rowTypeWithRowTracking(dataFileSchema.logicalRowType())
+ .getFields().stream()
+ .mapToInt(DataField::id)
+ .toArray();
+
+ int count = 0;
+ for (int j = 0; j < fieldsCount; j++) {
+ for (int fieldId : fieldIds) {
+ if (allFields[j] == fieldId) {
+ // TODO: If type not match (e.g. int -> string), we
need to skip this, set
+ // rowOffsets[j] = -1 always. (may -2, after all, set
it back to -1)
+ // Because schema evolution may happen to change int
to string or something
+ // like that.
+ if (rowOffsets[j] == -1) {
+ rowOffsets[j] = i;
+ fieldOffsets[j] = count++;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ DataEvolutionRow finalMin = new DataEvolutionRow(metas.size(),
rowOffsets, fieldOffsets);
+ DataEvolutionRow finalMax = new DataEvolutionRow(metas.size(),
rowOffsets, fieldOffsets);
+ DataEvolutionArray finalNullCounts =
+ new DataEvolutionArray(metas.size(), rowOffsets, fieldOffsets);
+
+ finalMin.setRows(min);
+ finalMax.setRows(max);
+ finalNullCounts.setRows(nullCounts);
+ return new SimpleStatsEvolution.Result(finalMin, finalMax,
finalNullCounts);
+ }
+
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 1de22d4997..38bb6cdeef 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -153,6 +153,11 @@ public class KeyValueFileStoreScan extends
AbstractFileStoreScan {
file.rowCount(), stats.minValues(), stats.maxValues(),
stats.nullCounts());
}
+ @Override
+ protected List<ManifestEntry> postFilter(List<ManifestEntry> entries) {
+ return entries;
+ }
+
@Override
protected ManifestEntry dropStats(ManifestEntry entry) {
if (!isValueFilterEnabled() && wholeBucketFilterEnabled()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
index 05bd3ae4ed..7035bf4f49 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataEvolutionSplitGenerator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.utils.BinPacking;
import java.util.ArrayList;
@@ -77,39 +78,63 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
}
public static List<List<DataFileMeta>> split(List<DataFileMeta> files) {
- List<List<DataFileMeta>> splitByRowId = new ArrayList<>();
+ return split(
+ files,
+ DataFileMeta::fileName,
+ DataFileMeta::firstRowId,
+ DataFileMeta::rowCount,
+ DataFileMeta::maxSequenceNumber);
+ }
+
+ public static List<List<ManifestEntry>> splitManifests(List<ManifestEntry>
entries) {
+ return split(
+ entries,
+ entry -> entry.file().fileName(),
+ entry -> entry.file().firstRowId(),
+ entry -> entry.file().rowCount(),
+ entry -> entry.file().maxSequenceNumber());
+ }
+
+ public static <T> List<List<T>> split(
+ List<T> files,
+ Function<T, String> fileNameF,
+ Function<T, Long> firstRowIdF,
+ Function<T, Long> rowCountF,
+ Function<T, Long> maxSequenceNumberF) {
+ List<List<T>> splitByRowId = new ArrayList<>();
// Sort files by firstRowId and then by maxSequenceNumber
files.sort(
Comparator.comparingLong(
- (ToLongFunction<DataFileMeta>)
+ (ToLongFunction<T>)
value ->
- value.firstRowId() == null
+ firstRowIdF.apply(value) ==
null
? Long.MIN_VALUE
- : value.firstRowId())
- .thenComparingInt(f -> isBlobFile(f.fileName()) ? 1 :
0)
+ :
firstRowIdF.apply(value))
+ .thenComparingInt(f -> isBlobFile(fileNameF.apply(f))
? 1 : 0)
.thenComparing(
(f1, f2) -> {
// If firstRowId is the same, we should
read the file with
// larger sequence number first. Because
larger sequence number
// file is more fresh
return Long.compare(
- f2.maxSequenceNumber(),
f1.maxSequenceNumber());
+ maxSequenceNumberF.apply(f2),
+ maxSequenceNumberF.apply(f1));
}));
- files = filterBlob(files);
+ files = filterBlob(files, fileNameF, firstRowIdF, rowCountF);
// Split files by firstRowId
long lastRowId = -1;
long checkRowIdStart = 0;
- List<DataFileMeta> currentSplit = new ArrayList<>();
+ List<T> currentSplit = new ArrayList<>();
for (int i = 0; i < files.size(); i++) {
- DataFileMeta file = files.get(i);
- Long firstRowId = file.firstRowId();
+ T file = files.get(i);
+ Long firstRowId = firstRowIdF.apply(file);
if (firstRowId == null) {
splitByRowId.add(Collections.singletonList(file));
continue;
}
- if (!isBlobFile(file.fileName()) && firstRowId != lastRowId) {
+ if (!isBlobFile(fileNameF.apply(file)) && firstRowId != lastRowId)
{
if (!currentSplit.isEmpty()) {
splitByRowId.add(currentSplit);
}
@@ -118,13 +143,13 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
String.format(
"There are overlapping files in the split:
\n %s, the wrong file is: \n %s",
files.subList(Math.max(0, i - 20),
i).stream()
- .map(DataFileMeta::toString)
+ .map(Object::toString)
.collect(Collectors.joining(",")),
file));
}
currentSplit = new ArrayList<>();
lastRowId = firstRowId;
- checkRowIdStart = firstRowId + file.rowCount();
+ checkRowIdStart = firstRowId + rowCountF.apply(file);
}
currentSplit.add(file);
}
@@ -135,21 +160,25 @@ public class DataEvolutionSplitGenerator implements
SplitGenerator {
return splitByRowId;
}
- private static List<DataFileMeta> filterBlob(List<DataFileMeta> files) {
- List<DataFileMeta> result = new ArrayList<>();
+ private static <T> List<T> filterBlob(
+ List<T> files,
+ Function<T, String> fileNameF,
+ Function<T, Long> firstRowIdF,
+ Function<T, Long> rowCountF) {
+ List<T> result = new ArrayList<>();
long rowIdStart = -1;
long rowIdEnd = -1;
- for (DataFileMeta file : files) {
- if (file.firstRowId() == null) {
+ for (T file : files) {
+ if (firstRowIdF.apply(file) == null) {
result.add(file);
continue;
}
- if (!isBlobFile(file.fileName())) {
- rowIdStart = file.firstRowId();
- rowIdEnd = file.firstRowId() + file.rowCount();
+ if (!isBlobFile(fileNameF.apply(file))) {
+ rowIdStart = firstRowIdF.apply(file);
+ rowIdEnd = firstRowIdF.apply(file) + rowCountF.apply(file);
result.add(file);
} else {
- if (file.firstRowId() >= rowIdStart && file.firstRowId() <
rowIdEnd) {
+ if (firstRowIdF.apply(file) >= rowIdStart &&
firstRowIdF.apply(file) < rowIdEnd) {
result.add(file);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index dc4db3da5b..6ddc9b410d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -417,6 +417,14 @@ public class DataEvolutionTableTest extends TableTestBase {
readBuilder.withFilter(predicate);
assertThat(((DataSplit)
readBuilder.newScan().plan().splits().get(0)).dataFiles().size())
.isEqualTo(2);
+
+ predicate = predicateBuilder.notEqual(1, BinaryString.fromString("a"));
+ readBuilder.withFilter(predicate);
+ assertThat(readBuilder.newScan().plan().splits().isEmpty()).isTrue();
+
+ predicate = predicateBuilder.notEqual(2, BinaryString.fromString("c"));
+ readBuilder.withFilter(predicate);
+ assertThat(readBuilder.newScan().plan().splits().isEmpty()).isTrue();
}
protected Schema schemaDefault() {