This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d5723fc9a0 [core] should not push topN with file schema evolution
(#6085)
d5723fc9a0 is described below
commit d5723fc9a02bd3b5d256efa045021137bb5c8532
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Aug 18 15:38:04 2025 +0800
[core] should not push topN with file schema evolution (#6085)
---
.../java/org/apache/paimon/schema/TableSchema.java | 26 ++++--
.../org/apache/paimon/AppendOnlyFileStore.java | 3 +-
.../java/org/apache/paimon/KeyValueFileStore.java | 3 +-
.../org/apache/paimon/io/FileIndexEvaluator.java | 4 +-
.../apache/paimon/operation/RawFileSplitRead.java | 9 +-
.../apache/paimon/utils/FormatReaderMapping.java | 22 ++++-
.../paimon/table/AppendOnlySimpleTableTest.java | 95 ++++++++++++++++++----
.../flink/source/TestChangelogDataReadWrite.java | 1 +
8 files changed, 131 insertions(+), 32 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
index 6e012c016b..b4baf20cb1 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -143,6 +143,16 @@ public class TableSchema implements Serializable {
return
fields.stream().map(DataField::name).collect(Collectors.toList());
}
+ public Map<String, DataField> nameToFieldMap() {
+ return fields.stream()
+ .collect(Collectors.toMap(DataField::name, field -> field, (a,
b) -> b));
+ }
+
+ public Map<Integer, DataField> idToFieldMap() {
+ return fields.stream()
+ .collect(Collectors.toMap(DataField::id, field -> field, (a,
b) -> b));
+ }
+
public int highestFieldId() {
return highestFieldId;
}
@@ -156,14 +166,14 @@ public class TableSchema implements Serializable {
}
public List<String> trimmedPrimaryKeys() {
- if (primaryKeys.size() > 0) {
+ if (!primaryKeys.isEmpty()) {
List<String> adjusted =
primaryKeys.stream()
.filter(pk -> !partitionKeys.contains(pk))
.collect(Collectors.toList());
Preconditions.checkState(
- adjusted.size() > 0,
+ !adjusted.isEmpty(),
String.format(
"Primary key constraint %s should not be same with
partition fields %s,"
+ " this will result in only one record in
a partition",
@@ -192,7 +202,7 @@ public class TableSchema implements Serializable {
return false;
}
- return !primaryKeys.containsAll(partitionKeys);
+ return notContainsAll(primaryKeys, partitionKeys);
}
/** Original bucket keys, maybe empty. */
@@ -202,7 +212,7 @@ public class TableSchema implements Serializable {
return Collections.emptyList();
}
List<String> bucketKeys = Arrays.asList(key.split(","));
- if (!containsAll(fieldNames(), bucketKeys)) {
+ if (notContainsAll(fieldNames(), bucketKeys)) {
throw new RuntimeException(
String.format(
"Field names %s should contains all bucket keys
%s.",
@@ -214,8 +224,8 @@ public class TableSchema implements Serializable {
"Bucket keys %s should not in partition keys %s.",
bucketKeys, partitionKeys));
}
- if (primaryKeys.size() > 0) {
- if (!containsAll(primaryKeys, bucketKeys)) {
+ if (!primaryKeys.isEmpty()) {
+ if (notContainsAll(primaryKeys, bucketKeys)) {
throw new RuntimeException(
String.format(
"Primary keys %s should contains all bucket
keys %s.",
@@ -225,8 +235,8 @@ public class TableSchema implements Serializable {
return bucketKeys;
}
- private boolean containsAll(List<String> all, List<String> contains) {
- return new HashSet<>(all).containsAll(new HashSet<>(contains));
+ private boolean notContainsAll(List<String> all, List<String> contains) {
+ return !new HashSet<>(all).containsAll(new HashSet<>(contains));
}
public @Nullable String comment() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 0d9ad3b121..d7fa9eb66b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -83,7 +83,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
FileFormatDiscover.of(options),
pathFactory(),
options.fileIndexReadEnabled(),
- options.rowTrackingEnabled());
+ options.rowTrackingEnabled(),
+ options.deletionVectorsEnabled());
}
public DataEvolutionSplitRead newDataEvolutionRead() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index f7a743ace1..48d0bf872b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -132,7 +132,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
FileFormatDiscover.of(options),
pathFactory(),
options.fileIndexReadEnabled(),
- false);
+ false,
+ options.deletionVectorsEnabled());
}
public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
index b58907de1c..c71af3f2bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java
@@ -27,6 +27,8 @@ import org.apache.paimon.predicate.TopN;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.utils.ListUtils;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@@ -38,7 +40,7 @@ public class FileIndexEvaluator {
FileIO fileIO,
TableSchema dataSchema,
List<Predicate> dataFilter,
- TopN topN,
+ @Nullable TopN topN,
DataFilePathFactory dataFilePathFactory,
DataFileMeta file)
throws IOException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 44374a503a..27211a26bd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -77,6 +77,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
private final TableSchema schema;
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
+ private final boolean deletionVectorsEnabled;
private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
private final boolean fileIndexReadEnabled;
private final boolean rowTrackingEnabled;
@@ -93,12 +94,14 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
boolean fileIndexReadEnabled,
- boolean rowTrackingEnabled) {
+ boolean rowTrackingEnabled,
+ boolean deletionVectorsEnabled) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schema = schema;
this.formatDiscover = formatDiscover;
this.pathFactory = pathFactory;
+ this.deletionVectorsEnabled = deletionVectorsEnabled;
this.formatReaderMappings = new HashMap<>();
this.fileIndexReadEnabled = fileIndexReadEnabled;
this.rowTrackingEnabled = rowTrackingEnabled;
@@ -131,7 +134,9 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
@Override
public SplitRead<InternalRow> withTopN(@Nullable TopN topN) {
- this.topN = topN;
+ if (!deletionVectorsEnabled) {
+ this.topN = topN;
+ }
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
index 8422ed8e78..4f867b5ac1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java
@@ -23,6 +23,7 @@ import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.SortValue;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
@@ -41,6 +42,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.function.Function;
import static
org.apache.paimon.predicate.PredicateBuilder.excludePredicateWithFields;
@@ -221,7 +223,25 @@ public class FormatReaderMapping {
dataSchema,
readFilters,
systemFields,
- topN);
+ evolutionTopN(tableSchema, dataSchema));
+ }
+
+ @Nullable
+ private TopN evolutionTopN(TableSchema tableSchema, TableSchema
dataSchema) {
+ TopN pushTopN = topN;
+ if (pushTopN != null) {
+ Map<String, DataField> tableFields =
tableSchema.nameToFieldMap();
+ Map<Integer, DataField> dataFields = dataSchema.idToFieldMap();
+ for (SortValue value : pushTopN.orders()) {
+ DataField tableField =
tableFields.get(value.field().name());
+ DataField dataField = dataFields.get(tableField.id());
+ if (!Objects.equals(tableField, dataField)) {
+ pushTopN = null;
+ break;
+ }
+ }
+ }
+ return pushTopN;
}
public FormatReaderMapping build(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index 3188f3223c..babb62a995 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -46,6 +46,7 @@ import org.apache.paimon.predicate.SortValue;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
@@ -91,6 +92,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
import static org.apache.paimon.CoreOptions.DATA_FILE_PATH_DIRECTORY;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET;
import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD;
@@ -831,25 +833,23 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
.field("event", DataTypes.STRING())
.field("price", DataTypes.INT())
.build();
+ Consumer<Options> configure =
+ options -> {
+ options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
+ options.set(WRITE_ONLY, true);
+ options.set(
+ FileIndexOptions.FILE_INDEX
+ + "."
+ + RangeBitmapFileIndexFactory.RANGE_BITMAP
+ + "."
+ + CoreOptions.COLUMNS,
+ "price");
+ options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
+
options.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
+ options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT,
"300");
+ };
// in unaware-bucket mode, we split files into splits all the time
- FileStoreTable table =
- createUnawareBucketFileStoreTable(
- rowType,
- options -> {
- options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
- options.set(WRITE_ONLY, true);
- options.set(
- FileIndexOptions.FILE_INDEX
- + "."
- +
RangeBitmapFileIndexFactory.RANGE_BITMAP
- + "."
- + CoreOptions.COLUMNS,
- "price");
- options.set(ParquetOutputFormat.BLOCK_SIZE,
"1048576");
- options.set(
-
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
-
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
- });
+ FileStoreTable table = createUnawareBucketFileStoreTable(rowType,
configure);
int bound = 300000;
int rowCount = 1000000;
@@ -918,6 +918,65 @@ public class AppendOnlySimpleTableTest extends
SimpleTableTestBase {
assertThat(cnt.get()).isEqualTo(rowCount);
reader.close();
}
+
+ // test should not push topN with index and evolution
+ {
+ table.schemaManager()
+ .commitChanges(SchemaChange.updateColumnType("price",
DataTypes.BIGINT()));
+ rowType =
+ RowType.builder()
+ .field("id", DataTypes.STRING())
+ .field("event", DataTypes.STRING())
+ .field("price", DataTypes.BIGINT())
+ .build();
+ table = createUnawareBucketFileStoreTable(rowType, configure);
+ DataField field = rowType.getField("price");
+ SortValue sort =
+ new SortValue(
+ new FieldRef(field.id(), field.name(),
field.type()),
+ SortValue.SortDirection.DESCENDING,
+ SortValue.NullOrdering.NULLS_LAST);
+ TopN topN = new TopN(Collections.singletonList(sort), k);
+ TableScan.Plan plan = table.newScan().plan();
+ RecordReader<InternalRow> reader =
+ table.newRead().withTopN(topN).createReader(plan.splits());
+ AtomicInteger cnt = new AtomicInteger(0);
+ reader.forEachRemaining(row -> cnt.incrementAndGet());
+ assertThat(cnt.get()).isEqualTo(rowCount);
+ reader.close();
+ }
+
+ // test should not push topN with dv modes
+ {
+ table.schemaManager()
+ .commitChanges(SchemaChange.updateColumnType("price",
DataTypes.INT()));
+ rowType =
+ RowType.builder()
+ .field("id", DataTypes.STRING())
+ .field("event", DataTypes.STRING())
+ .field("price", DataTypes.INT())
+ .build();
+ Consumer<Options> newConfigure =
+ options -> {
+ configure.accept(options);
+ options.set(DELETION_VECTORS_ENABLED, true);
+ };
+ table = createUnawareBucketFileStoreTable(rowType, newConfigure);
+ DataField field = rowType.getField("price");
+ SortValue sort =
+ new SortValue(
+ new FieldRef(field.id(), field.name(),
field.type()),
+ SortValue.SortDirection.DESCENDING,
+ SortValue.NullOrdering.NULLS_LAST);
+ TopN topN = new TopN(Collections.singletonList(sort), k);
+ TableScan.Plan plan = table.newScan().plan();
+ RecordReader<InternalRow> reader =
+ table.newRead().withTopN(topN).createReader(plan.splits());
+ AtomicInteger cnt = new AtomicInteger(0);
+ reader.forEachRemaining(row -> cnt.incrementAndGet());
+ assertThat(cnt.get()).isEqualTo(rowCount);
+ reader.close();
+ }
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index f234ae980a..2e9695a525 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -147,6 +147,7 @@ public class TestChangelogDataReadWrite {
FileFormatDiscover.of(options),
pathFactory,
options.fileIndexReadEnabled(),
+ false,
false);
return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
}