This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3ed824e983 [core] Fix issue: data evolution meet writeCols not equals
to statsCols (#6481)
3ed824e983 is described below
commit 3ed824e9830270071c997a7cbd984db895f02374
Author: YeJunHao <[email protected]>
AuthorDate: Tue Oct 28 15:24:22 2025 +0800
[core] Fix issue: data evolution meet writeCols not equals to statsCols
(#6481)
---
.../java/org/apache/paimon/schema/TableSchema.java | 2 +-
.../apache/paimon/reader/DataEvolutionArray.java | 2 +-
.../org/apache/paimon/reader/DataEvolutionRow.java | 2 +-
.../operation/DataEvolutionFileStoreScan.java | 58 ++--
.../operation/DataEvolutionFileStoreScanTest.java | 374 +++++++++++++++++++++
5 files changed, 415 insertions(+), 23 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
index fb2d16c7ed..8fee740fd1 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -281,7 +281,7 @@ public class TableSchema implements Serializable {
}
public TableSchema project(@Nullable List<String> writeCols) {
- if (writeCols == null || writeCols.isEmpty()) {
+ if (writeCols == null) {
return this;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
index 7a14a3ef9c..7ebccba6b2 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionArray.java
@@ -72,7 +72,7 @@ public class DataEvolutionArray implements InternalArray {
@Override
public boolean isNullAt(int pos) {
- if (rowOffsets[pos] == -1) {
+ if (rowOffsets[pos] < 0) {
return true;
}
return chooseArray(pos).isNullAt(offsetInRow(pos));
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
index 824a0c438d..cc5cf2b18b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/DataEvolutionRow.java
@@ -96,7 +96,7 @@ public class DataEvolutionRow implements InternalRow {
@Override
public boolean isNullAt(int pos) {
- if (rowOffsets[pos] == -1) {
+ if (rowOffsets[pos] < 0) {
return true;
}
return chooseRow(pos).isNullAt(offsetInRow(pos));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
index 03f36d2ea0..71380ba88a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionFileStoreScan.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
@@ -30,7 +31,6 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsEvolution;
-import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.source.DataEvolutionSplitGenerator;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.SnapshotManager;
@@ -38,6 +38,7 @@ import org.apache.paimon.utils.SnapshotManager;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.function.Function;
import java.util.stream.Collectors;
/** {@link FileStoreScan} for data-evolution enabled table. */
@@ -98,7 +99,8 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
private boolean filterByStats(List<ManifestEntry> metas) {
long rowCount = metas.get(0).file().rowCount();
- SimpleStatsEvolution.Result evolutionResult = evolutionStats(metas);
+ SimpleStatsEvolution.Result evolutionResult =
+ evolutionStats(schema, this::scanTableSchema, metas);
return inputFilter.test(
rowCount,
evolutionResult.minValues(),
@@ -106,7 +108,11 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
evolutionResult.nullCounts());
}
- private SimpleStatsEvolution.Result evolutionStats(List<ManifestEntry>
metas) {
+ @VisibleForTesting
+ static SimpleStatsEvolution.Result evolutionStats(
+ TableSchema schema,
+ Function<Long, TableSchema> scanTableSchema,
+ List<ManifestEntry> metas) {
int[] allFields =
schema.fields().stream().mapToInt(DataField::id).toArray();
int fieldsCount = schema.fields().size();
int[] rowOffsets = new int[fieldsCount];
@@ -127,31 +133,43 @@ public class DataEvolutionFileStoreScan extends
AppendOnlyFileStoreScan {
for (int i = 0; i < metas.size(); i++) {
DataFileMeta fileMeta = metas.get(i).file();
+
TableSchema dataFileSchema =
- scanTableSchema(fileMeta.schemaId())
- .project(
- fileMeta.valueStatsCols() == null
- ? fileMeta.writeCols()
- : fileMeta.valueStatsCols());
+
scanTableSchema.apply(fileMeta.schemaId()).project(fileMeta.writeCols());
+
+ TableSchema dataFileSchemaWithStats =
dataFileSchema.project(fileMeta.valueStatsCols());
+
int[] fieldIds =
-
SpecialFields.rowTypeWithRowTracking(dataFileSchema.logicalRowType())
- .getFields().stream()
+ dataFileSchema.logicalRowType().getFields().stream()
.mapToInt(DataField::id)
.toArray();
- int count = 0;
+ int[] fieldIdsWithStats =
+
dataFileSchemaWithStats.logicalRowType().getFields().stream()
+ .mapToInt(DataField::id)
+ .toArray();
+
+ loop1:
for (int j = 0; j < fieldsCount; j++) {
+ if (rowOffsets[j] != -1) {
+ continue;
+ }
+ int targetFieldId = allFields[j];
for (int fieldId : fieldIds) {
- if (allFields[j] == fieldId) {
- // TODO: If type not match (e.g. int -> string), we
need to skip this, set
- // rowOffsets[j] = -1 always. (may -2, after all, set
it back to -1)
- // Because schema evolution may happen to change int
to string or something
- // like that.
- if (rowOffsets[j] == -1) {
- rowOffsets[j] = i;
- fieldOffsets[j] = count++;
+ if (targetFieldId == fieldId) {
+ for (int k = 0; k < fieldIdsWithStats.length; k++) {
+ if (fieldId == fieldIdsWithStats[k]) {
+ // TODO: If type not match (e.g. int ->
string), we need to skip
+ // this, set rowOffsets[j] = -1 always. (may
-2, after all, set it
+ // back to -1) Because schema evolution may
happen to change int to
+ // string or something like that.
+ rowOffsets[j] = i;
+ fieldOffsets[j] = k;
+ continue loop1;
+ }
}
- break;
+ rowOffsets[j] = -2;
+ continue loop1;
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
new file mode 100644
index 0000000000..7895d8d8c6
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionFileStoreScanTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.BinaryArray;
+import org.apache.paimon.data.BinaryArrayWriter;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.reader.DataEvolutionArray;
+import org.apache.paimon.reader.DataEvolutionRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.stats.SimpleStatsEvolution;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataEvolutionFileStoreScan}. */
+public class DataEvolutionFileStoreScanTest {
+
+ private Map<Long, TableSchema> schemas;
+ private Function<Long, TableSchema> scanTableSchema;
+
+ @BeforeEach
+ public void setUp() {
+ schemas = new HashMap<>();
+ scanTableSchema = schemas::get;
+ }
+
+ @Test
+ public void testEvolutionStatsSingleFile() {
+ Schema schema = createSchema("f0", "f1");
+ TableSchema tableSchema = TableSchema.create(0L, schema);
+ schemas.put(0L, tableSchema);
+
+ ManifestEntry entry =
+ createManifestEntry(
+ 0L,
+ createSimpleStats(
+ GenericRow.of(1, BinaryString.fromString("a")),
+ GenericRow.of(5, BinaryString.fromString("z")),
+ createBinaryArray(new int[] {0, 1}),
+ new int[] {0, 1}));
+
+ SimpleStatsEvolution.Result result =
+ DataEvolutionFileStoreScan.evolutionStats(
+ tableSchema, scanTableSchema,
Collections.singletonList(entry));
+
+ assertThat(result).isNotNull();
+ assertThat(result.minValues()).isInstanceOf(DataEvolutionRow.class);
+ assertThat(result.maxValues()).isInstanceOf(DataEvolutionRow.class);
+ assertThat(result.nullCounts()).isInstanceOf(DataEvolutionArray.class);
+
+ DataEvolutionRow minRow = (DataEvolutionRow) result.minValues();
+ DataEvolutionRow maxRow = (DataEvolutionRow) result.maxValues();
+ DataEvolutionArray nullCounts = (DataEvolutionArray)
result.nullCounts();
+
+ assertThat(minRow.rowNumber()).isEqualTo(1);
+ assertThat(maxRow.rowNumber()).isEqualTo(1);
+ assertThat(nullCounts.size()).isEqualTo(2);
+
+ assertThat(minRow.getInt(0)).isEqualTo(1);
+ assertThat(maxRow.getInt(0)).isEqualTo(5);
+ assertThat(minRow.getString(1).toString()).isEqualTo("a");
+ assertThat(maxRow.getString(1).toString()).isEqualTo("z");
+
+ assertThat(nullCounts.getInt(0)).isEqualTo(0);
+ assertThat(nullCounts.getInt(1)).isEqualTo(1);
+
+ assertThat(minRow.getFieldCount()).isEqualTo(2);
+ assertThat(maxRow.getFieldCount()).isEqualTo(2);
+ }
+
+ @Test
+ public void testEvolutionStatsMultipleFiles() {
+ Schema schema = createSchema("f0", "f1", "f2");
+ TableSchema tableSchema = TableSchema.create(0L, schema);
+ schemas.put(0L, tableSchema);
+ schemas.put(1L, tableSchema.project(Arrays.asList("f0", "f2")));
+
+ ManifestEntry entry1 =
+ createManifestEntry(
+ 0L,
+ createSimpleStats(
+ GenericRow.of(1, BinaryString.fromString("a"),
10),
+ GenericRow.of(3, BinaryString.fromString("c"),
30),
+ createBinaryArray(new int[] {0, 1, 0}),
+ new int[] {0, 1, 2}));
+
+ ManifestEntry entry2 =
+ createManifestEntry(
+ 1L,
+ createSimpleStats(
+ GenericRow.of(2, 20),
+ GenericRow.of(4, 40),
+ createBinaryArray(new int[] {1, 2}),
+ new int[] {0, 2}));
+
+ List<ManifestEntry> entries = Arrays.asList(entry2, entry1);
+
+ SimpleStatsEvolution.Result result =
+ DataEvolutionFileStoreScan.evolutionStats(tableSchema,
scanTableSchema, entries);
+
+ assertThat(result).isNotNull();
+ DataEvolutionRow minRow = (DataEvolutionRow) result.minValues();
+ DataEvolutionRow maxRow = (DataEvolutionRow) result.maxValues();
+ DataEvolutionArray nullCounts = (DataEvolutionArray)
result.nullCounts();
+
+ assertThat(minRow.getInt(0)).isEqualTo(2);
+ assertThat(maxRow.getInt(0)).isEqualTo(4);
+ assertThat(minRow.getInt(2)).isEqualTo(20);
+ assertThat(maxRow.getInt(2)).isEqualTo(40);
+ assertThat(minRow.getString(1).toString()).isEqualTo("a");
+ assertThat(maxRow.getString(1).toString()).isEqualTo("c");
+ assertThat(nullCounts.getInt(0)).isEqualTo(1);
+ assertThat(nullCounts.getInt(1)).isEqualTo(1);
+ assertThat(nullCounts.getInt(2)).isEqualTo(2);
+ }
+
+ @Test
+ public void testEvolutionStatsWithSchemaEvolution() {
+ Schema baseSchema = createSchema("f0", "f1");
+ TableSchema baseTableSchema = TableSchema.create(0L, baseSchema);
+ schemas.put(0L, baseTableSchema);
+
+ Schema evolvedSchema = createSchema("f0", "f1", "f2");
+ TableSchema evolvedTableSchema = TableSchema.create(1L, evolvedSchema);
+ schemas.put(1L, evolvedTableSchema);
+
+ ManifestEntry entry1 =
+ createManifestEntry(
+ 0L,
+ createSimpleStats(
+ GenericRow.of(1, BinaryString.fromString("a")),
+ GenericRow.of(3, BinaryString.fromString("c")),
+ createBinaryArray(new int[] {0, 1}),
+ new int[] {0, 1}));
+
+ ManifestEntry entry2 =
+ createManifestEntry(
+ 1L,
+ createSimpleStats(
+ GenericRow.of(2, BinaryString.fromString("b"),
20),
+ GenericRow.of(4, BinaryString.fromString("d"),
40),
+ createBinaryArray(new int[] {1, 0, 1}),
+ new int[] {0, 1, 2}));
+
+ List<ManifestEntry> entries = Arrays.asList(entry1, entry2);
+
+ SimpleStatsEvolution.Result result =
+ DataEvolutionFileStoreScan.evolutionStats(
+ evolvedTableSchema, scanTableSchema, entries);
+
+ assertThat(result).isNotNull();
+ DataEvolutionRow minRow = (DataEvolutionRow) result.minValues();
+ DataEvolutionRow maxRow = (DataEvolutionRow) result.maxValues();
+ DataEvolutionArray nullCounts = (DataEvolutionArray)
result.nullCounts();
+
+ assertThat(minRow.getInt(0)).isEqualTo(1);
+ assertThat(maxRow.getInt(0)).isEqualTo(3);
+
+ assertThat(minRow.getString(1).toString()).isEqualTo("a");
+ assertThat(maxRow.getString(1).toString()).isEqualTo("c");
+
+ assertThat(minRow.getInt(2)).isEqualTo(20);
+ assertThat(maxRow.getInt(2)).isEqualTo(40);
+
+ assertThat(nullCounts.getInt(0)).isEqualTo(0);
+ assertThat(nullCounts.getInt(1)).isEqualTo(1);
+ assertThat(nullCounts.getInt(2)).isEqualTo(1);
+ }
+
+ @Test
+ public void testEvolutionStatsWithWriteColsNotEqualToValueStatsCols() {
+ Schema schema = createSchema("f0", "f1", "f2");
+ TableSchema tableSchema = TableSchema.create(0L, schema);
+ schemas.put(0L, tableSchema);
+ schemas.put(1L, tableSchema);
+
+ ManifestEntry entry1 =
+ createManifestEntryWithDifferentCols(
+ 0L,
+ new String[] {"f0", "f1", "f2"},
+ new String[] {"f0", "f1"},
+ createSimpleStats(
+ GenericRow.of(1, BinaryString.fromString("a")),
+ GenericRow.of(3, BinaryString.fromString("c")),
+ createBinaryArray(new int[] {0, 1}),
+ new int[] {0, 1}));
+
+ ManifestEntry entry2 =
+ createManifestEntryWithDifferentCols(
+ 1L,
+ new String[] {"f0", "f2"},
+ new String[] {"f0", "f2"},
+ createSimpleStats(
+ GenericRow.of(2, 20),
+ GenericRow.of(4, 40),
+ createBinaryArray(new int[] {1, 0}),
+ new int[] {0, 2}));
+
+ List<ManifestEntry> entries = Arrays.asList(entry1, entry2);
+
+ SimpleStatsEvolution.Result result =
+ DataEvolutionFileStoreScan.evolutionStats(tableSchema,
scanTableSchema, entries);
+
+ assertThat(result).isNotNull();
+ DataEvolutionRow minRow = (DataEvolutionRow) result.minValues();
+ DataEvolutionRow maxRow = (DataEvolutionRow) result.maxValues();
+ DataEvolutionArray nullCounts = (DataEvolutionArray)
result.nullCounts();
+
+ assertThat(minRow.getInt(0)).isEqualTo(1);
+ assertThat(maxRow.getInt(0)).isEqualTo(3);
+
+ assertThat(minRow.getString(1).toString()).isEqualTo("a");
+ assertThat(maxRow.getString(1).toString()).isEqualTo("c");
+
+ assertThat(minRow.isNullAt(2)).isTrue();
+ assertThat(maxRow.isNullAt(2)).isTrue();
+
+ assertThat(nullCounts.getInt(0)).isEqualTo(0);
+ assertThat(nullCounts.getInt(1)).isEqualTo(1);
+ assertThat(nullCounts.isNullAt(2)).isTrue();
+ }
+
+ private Schema createSchema(String... fieldNames) {
+ Schema.Builder builder = Schema.newBuilder();
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (i == 0) {
+ builder.column(fieldNames[i], DataTypes.INT());
+ } else if (i == 1) {
+ builder.column(fieldNames[i], DataTypes.STRING());
+ } else {
+ builder.column(fieldNames[i], DataTypes.INT());
+ }
+ }
+ return builder.build();
+ }
+
+ private ManifestEntry createManifestEntry(Long schemaId, SimpleStats
stats) {
+ DataFileMeta fileMeta =
+ DataFileMeta.create(
+ "test-file.parquet",
+ 100L,
+ 100L,
+ createBinaryRow(1),
+ createBinaryRow(100),
+ stats,
+ stats,
+ 0L,
+ 0L,
+ schemaId,
+ 0,
+ Collections.emptyList(),
+ null,
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ null,
+ null);
+
+ return ManifestEntry.create(FileKind.ADD, createBinaryRow(0), 0, 0,
fileMeta);
+ }
+
+ private ManifestEntry createManifestEntryWithDifferentCols(
+ Long schemaId, String[] writeCols, String[] valueStatsCols,
SimpleStats stats) {
+ DataFileMeta fileMeta =
+ DataFileMeta.create(
+ "test-file.parquet",
+ 100L,
+ 100L,
+ createBinaryRow(1),
+ createBinaryRow(100),
+ stats,
+ stats,
+ 0L,
+ 0L,
+ schemaId,
+ 0,
+ Collections.emptyList(),
+ null,
+ null,
+ FileSource.APPEND,
+
Arrays.stream(valueStatsCols).collect(Collectors.toList()),
+ null,
+ null,
+ Arrays.stream(writeCols).collect(Collectors.toList()));
+
+ return ManifestEntry.create(FileKind.ADD, createBinaryRow(0), 0, 0,
fileMeta);
+ }
+
+ private BinaryRow createBinaryRow(int value) {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, value);
+ writer.complete();
+ return row;
+ }
+
+ private BinaryArray createBinaryArray(int[] values) {
+ BinaryArray array = new BinaryArray();
+ BinaryArrayWriter writer = new BinaryArrayWriter(array, values.length,
4);
+ for (int i = 0; i < values.length; i++) {
+ writer.writeInt(i, values[i]);
+ }
+ writer.complete();
+ return array;
+ }
+
+ private SimpleStats createSimpleStats(
+ InternalRow minValues, InternalRow maxValues, BinaryArray
nullCounts, int[] fields) {
+ return new SimpleStats(
+ convertToBinaryRow(minValues, fields),
+ convertToBinaryRow(maxValues, fields),
+ nullCounts);
+ }
+
+ private BinaryRow convertToBinaryRow(InternalRow row, int[] fields) {
+ BinaryRow binaryRow = new BinaryRow(fields.length);
+ BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
+ for (int i = 0; i < fields.length; i++) {
+ int fieldId = fields[i];
+ if (i >= row.getFieldCount() || row.isNullAt(i)) {
+ writer.setNullAt(i);
+ } else {
+ if (fieldId == 0) {
+ writer.writeInt(i, row.getInt(i));
+ } else if (fieldId == 1) {
+ writer.writeString(i, row.getString(i));
+ } else {
+ writer.writeInt(i, row.getInt(i));
+ }
+ }
+ }
+ writer.complete();
+ return binaryRow;
+ }
+}