This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 6364aaae20 Data: Add TCK tests for Schema Evolution in
BaseFormatModelTests (#15843)
6364aaae20 is described below
commit 6364aaae20c50b8bfeb550ec89b3b14feedd51b0
Author: GuoYu <[email protected]>
AuthorDate: Mon May 11 18:36:36 2026 +0800
Data: Add TCK tests for Schema Evolution in BaseFormatModelTests (#15843)
---
build.gradle | 9 +
.../org/apache/iceberg/avro/AvroTestHelpers.java | 8 +
.../apache/iceberg/data/BaseFormatModelTests.java | 450 +++++++++++++++++++++
flink/v1.20/build.gradle | 2 +
flink/v2.0/build.gradle | 2 +
flink/v2.1/build.gradle | 2 +
.../apache/iceberg/orc/OrcWritingTestUtils.java | 35 ++
.../org/apache/iceberg/orc/TestORCSchemaUtil.java | 8 +
.../iceberg/parquet/ParquetWritingTestUtils.java | 2 +-
.../iceberg/parquet/ParquetFileTestUtils.java | 36 ++
spark/v3.4/build.gradle | 2 +
spark/v3.5/build.gradle | 1 +
spark/v4.0/build.gradle | 1 +
spark/v4.1/build.gradle | 1 +
14 files changed, 558 insertions(+), 1 deletion(-)
diff --git a/build.gradle b/build.gradle
index 261dfabf04..fca32be9dc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -457,6 +457,8 @@ project(':iceberg-data') {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-orc', configuration:
'testArtifacts')
+ testImplementation(testFixtures(project(':iceberg-parquet')))
}
test {
@@ -939,6 +941,13 @@ project(':iceberg-parquet') {
exclude group: 'org.apache.avro', module: 'avro'
}
+ testFixturesApi(libs.parquet.hadoop) {
+ exclude group: 'org.apache.avro', module: 'avro'
+ // already shaded by Parquet
+ exclude group: 'it.unimi.dsi'
+ exclude group: 'org.codehaus.jackson'
+ }
+
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
}
diff --git a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
index 0a1cf43f4f..fd73706ce0 100644
--- a/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
+++ b/core/src/test/java/org/apache/iceberg/avro/AvroTestHelpers.java
@@ -177,4 +177,12 @@ public class AvroTestHelpers {
return reader.getMetaString("avro.codec");
}
}
+
+ public static boolean hasIds(Schema schema) {
+ return AvroSchemaUtil.hasIds(schema);
+ }
+
+ public static Schema removeIds(org.apache.iceberg.Schema schema) {
+ return RemoveIds.removeIds(schema);
+ }
}
diff --git
a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
index d0b8e3161b..a38b025e0f 100644
--- a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
+++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assumptions.assumeFalse;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
@@ -38,6 +39,14 @@ import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.IntStream;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -52,6 +61,8 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestTables;
+import org.apache.iceberg.avro.AvroTestHelpers;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
@@ -70,6 +81,15 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.DeleteSchemaUtil;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.orc.ORCSchemaUtil;
+import org.apache.iceberg.orc.OrcRowWriter;
+import org.apache.iceberg.orc.OrcWritingTestUtils;
+import org.apache.iceberg.orc.TestORCSchemaUtil;
+import org.apache.iceberg.parquet.ParquetFileTestUtils;
+import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -77,6 +97,14 @@ import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
@@ -1330,6 +1358,280 @@ public abstract class BaseFormatModelTests<T> {
readAndAssertGenericRecords(fileFormat, schema,
sourceRecords.stream().map(transform).toList());
}
+ /**
+ * Schema evolution: Adding column (reading with wider schema). Write with
DefaultSchema, read
+ * with additional optional columns. The new columns should be filled with
null values.
+ */
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionAddColumn(FileFormat fileFormat) throws IOException {
+ DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+ Schema writeSchema = dataGenerator.schema();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+ List<Types.NestedField> evolvedColumns =
Lists.newArrayList(writeSchema.columns());
+
+ int maxFieldId =
+
writeSchema.columns().stream().mapToInt(Types.NestedField::fieldId).max().orElse(0);
+ evolvedColumns.add(
+ Types.NestedField.optional("new_string_col")
+ .withId(maxFieldId + 1)
+ .ofType(Types.StringType.get())
+ .build());
+ evolvedColumns.add(
+ Types.NestedField.optional("new_int_col")
+ .withId(maxFieldId + 2)
+ .ofType(Types.IntegerType.get())
+ .build());
+ Schema readSchema = new Schema(evolvedColumns);
+ readAndAssertEngineRecords(
+ fileFormat,
+ readSchema,
+ genericRecords,
+ record -> {
+ Record expected = copy(record, writeSchema, readSchema);
+
+ expected.setField("new_string_col", null);
+ expected.setField("new_int_col", null);
+ return expected;
+ });
+ }
+
+ /**
+ * Schema evolution: Projection / Removing column (reading with narrower
schema). Write with
+ * DefaultSchema, read with only a subset of columns (skipping middle
columns).
+ */
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionProjection(FileFormat fileFormat) throws IOException
{
+ DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+ Schema writeSchema = dataGenerator.schema();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+ List<Types.NestedField> writeColumns = writeSchema.columns();
+ assumeThat(writeColumns).hasSizeGreaterThanOrEqualTo(2);
+ Schema projectedSchema =
+ new Schema(writeColumns.get(0), writeColumns.get(writeColumns.size() -
1));
+
+ readAndAssertEngineRecords(
+ fileFormat,
+ projectedSchema,
+ genericRecords,
+ record -> copy(record, projectedSchema, projectedSchema));
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionDropAndReAddSameNameColumn(FileFormat fileFormat)
throws IOException {
+
+ DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+ Schema writeSchema = dataGenerator.schema();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+ // Remove col_b and add a new col_b with a different field ID
+ Schema readSchema =
+ new Schema(
+ Types.NestedField.required(1, "col_a", Types.StringType.get()),
+ Types.NestedField.optional(6, "col_b", Types.IntegerType.get()),
+ Types.NestedField.required(3, "col_c", Types.LongType.get()),
+ Types.NestedField.required(4, "col_d", Types.FloatType.get()),
+ Types.NestedField.required(5, "col_e", Types.DoubleType.get()));
+
+ readAndAssertEngineRecords(
+ fileFormat,
+ readSchema,
+ genericRecords,
+ record -> {
+ Record expected = GenericRecord.create(readSchema);
+ expected.setField("col_a", record.getField("col_a"));
+ expected.setField("col_b", null);
+ expected.setField("col_c", record.getField("col_c"));
+ expected.setField("col_d", record.getField("col_d"));
+ expected.setField("col_e", record.getField("col_e"));
+ return expected;
+ });
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionTypePromotionIntToLong(FileFormat fileFormat) throws
IOException {
+ runTypePromotionCheck(
+ fileFormat,
+ Types.IntegerType.get(),
+ Types.LongType.get(),
+ value -> value == null ? null : ((Integer) value).longValue());
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionTypePromotionFloatToDouble(FileFormat fileFormat)
throws IOException {
+ runTypePromotionCheck(
+ fileFormat,
+ Types.FloatType.get(),
+ Types.DoubleType.get(),
+ value -> value == null ? null : ((Float) value).doubleValue());
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionTypePromotionDecimalPrecision(FileFormat fileFormat)
throws IOException {
+ runTypePromotionCheck(
+ fileFormat, Types.DecimalType.of(9, 2), Types.DecimalType.of(18, 2),
Function.identity());
+ }
+
+ /**
+ * Schema evolution: Reorder columns. Write with DefaultSchema {col_a,
col_b, col_c, col_d,
+ * col_e}, read with reordered schema {col_e, col_c, col_a, col_d, col_b}.
+ */
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionReorderColumns(FileFormat fileFormat) throws
IOException {
+ DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+ Schema writeSchema = dataGenerator.schema();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+ Schema reorderedSchema =
+ new Schema(
+ Types.NestedField.required(5, "col_e", Types.DoubleType.get()),
+ Types.NestedField.required(3, "col_c", Types.LongType.get()),
+ Types.NestedField.required(1, "col_a", Types.StringType.get()),
+ Types.NestedField.required(4, "col_d", Types.FloatType.get()),
+ Types.NestedField.required(2, "col_b", Types.IntegerType.get()));
+
+ readAndAssertEngineRecords(
+ fileFormat,
+ reorderedSchema,
+ genericRecords,
+ record -> copy(record, reorderedSchema, reorderedSchema));
+ }
+
+ /**
+ * Schema evolution: Rename column. Write with DefaultSchema where col_b has
field ID 2. Read with
+ * a schema where the same field ID 2 is renamed to "column_b". Since
Iceberg binds by field ID,
+ * the renamed column should still read the original data correctly.
+ */
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionRenameColumn(FileFormat fileFormat) throws
IOException {
+ DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+ Schema writeSchema = dataGenerator.schema();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+ // rename col_b(id=2) -> column_b, col_d(id=4) -> column_d
+ Schema renamedSchema =
+ new Schema(
+ Types.NestedField.required(1, "col_a", Types.StringType.get()),
+ Types.NestedField.required(2, "column_b", Types.IntegerType.get()),
+ Types.NestedField.required(3, "col_c", Types.LongType.get()),
+ Types.NestedField.required(4, "column_d", Types.FloatType.get()),
+ Types.NestedField.required(5, "col_e", Types.DoubleType.get()));
+
+ readAndAssertEngineRecords(
+ fileFormat,
+ renamedSchema,
+ genericRecords,
+ record -> {
+ Record expected = GenericRecord.create(renamedSchema);
+ expected.setField("col_a", record.getField("col_a"));
+ expected.setField("column_b", record.getField("col_b"));
+ expected.setField("col_c", record.getField("col_c"));
+ expected.setField("column_d", record.getField("col_d"));
+ expected.setField("col_e", record.getField("col_e"));
+ return expected;
+ });
+ }
+
+ /**
+ * Schema evolution: Required → Optional. Write with DefaultSchema where all
columns are required.
+ * Read with a schema where some columns are changed to optional. Iceberg
allows widening required
+ * to optional. The data should still be read correctly.
+ */
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionRequiredToOptional(FileFormat fileFormat) throws
IOException {
+ DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+ Schema writeSchema = dataGenerator.schema();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+ // change col_b and col_d to optional
+ Schema readSchema =
+ new Schema(
+ Types.NestedField.required(1, "col_a", Types.StringType.get()),
+ Types.NestedField.optional(2, "col_b", Types.IntegerType.get()),
+ Types.NestedField.required(3, "col_c", Types.LongType.get()),
+ Types.NestedField.optional(4, "col_d", Types.FloatType.get()),
+ Types.NestedField.required(5, "col_e", Types.DoubleType.get()));
+
+ readAndAssertEngineRecords(
+ fileFormat, readSchema, genericRecords, record -> copy(record,
readSchema, readSchema));
+ }
+
+ /**
+ * Schema evolution: Read with empty projection. Write with DefaultSchema,
read with an empty
+ * schema (no columns). The reader should return the correct number of rows
but with no data
+ * columns.
+ */
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testSchemaEvolutionEmptyProjection(FileFormat fileFormat) throws
IOException {
+ DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+ Schema writeSchema = dataGenerator.schema();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+ Schema emptySchema = new Schema();
+
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<T> readRecords;
+ try (CloseableIterable<T> reader =
+ FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+ .project(emptySchema)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ assertThat(readRecords).hasSameSizeAs(genericRecords);
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ void testReadFileWithoutFieldIdsUsingNameMapping(FileFormat fileFormat)
throws IOException {
+ DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
+ Schema icebergSchema = dataGenerator.schema();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+
+ // Write the file WITHOUT Iceberg field IDs (as an external writer would).
+ writeRecordsWithoutFieldIds(fileFormat, icebergSchema, genericRecords);
+
+ NameMapping nameMapping = MappingUtil.create(icebergSchema);
+
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<T> readRecords;
+ try (CloseableIterable<T> reader =
+ FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+ .project(icebergSchema)
+ .withNameMapping(nameMapping)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ assertEquals(icebergSchema, convertToEngineRecords(genericRecords,
icebergSchema), readRecords);
+ }
+
private void readAndAssertGenericRecords(
FileFormat fileFormat, Schema schema, List<Record> expected) throws
IOException {
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
@@ -1777,4 +2079,152 @@ public abstract class BaseFormatModelTests<T> {
return result;
}
+
+ private void writeRecordsWithoutFieldIds(
+ FileFormat fileFormat, Schema schema, List<Record> records) throws
IOException {
+ switch (fileFormat) {
+ case PARQUET -> writeParquetWithoutFieldIds(schema, records);
+ case AVRO -> writeAvroWithoutFieldIds(schema, records);
+ case ORC -> writeOrcWithoutFieldIds(schema, records);
+ default -> throw new UnsupportedOperationException("Unsupported file
format: " + fileFormat);
+ }
+ }
+
+ private void writeAvroWithoutFieldIds(Schema schema, List<Record> records)
throws IOException {
+ org.apache.avro.Schema avroSchemaWithoutIds =
AvroTestHelpers.removeIds(schema);
+
+ OutputFile outputFile = encryptedFile.encryptingOutputFile();
+ DatumWriter<GenericData.Record> datumWriter = new
GenericDatumWriter<>(avroSchemaWithoutIds);
+ try (OutputStream out = outputFile.create();
+ DataFileWriter<GenericData.Record> writer = new
DataFileWriter<>(datumWriter)) {
+ writer.create(avroSchemaWithoutIds, out);
+ for (Record record : records) {
+ GenericData.Record avroRecord = new
GenericData.Record(avroSchemaWithoutIds);
+ for (Types.NestedField field : schema.columns()) {
+ avroRecord.put(field.name(), record.getField(field.name()));
+ }
+
+ writer.append(avroRecord);
+ }
+ }
+
+ try (DataFileStream<GenericData.Record> reader =
+ new DataFileStream<>(outputFile.toInputFile().newStream(), new
GenericDatumReader<>())) {
+ assertThat(AvroTestHelpers.hasIds(reader.getSchema())).isFalse();
+ }
+ }
+
+ private void writeParquetWithoutFieldIds(Schema schema, List<Record>
records) throws IOException {
+ org.apache.avro.Schema avroSchemaWithoutIds =
AvroTestHelpers.removeIds(schema);
+
+ OutputFile outputFile = encryptedFile.encryptingOutputFile();
+
+ try (ParquetWriter<GenericData.Record> writer =
+
AvroParquetWriter.<GenericData.Record>builder(ParquetFileTestUtils.file(outputFile))
+ .withDataModel(GenericData.get())
+ .withSchema(avroSchemaWithoutIds)
+ .withConf(new Configuration())
+ .build()) {
+ for (Record record : records) {
+ GenericData.Record avroRecord = new
GenericData.Record(avroSchemaWithoutIds);
+ for (Types.NestedField field : schema.columns()) {
+ avroRecord.put(field.name(), record.getField(field.name()));
+ }
+
+ writer.write(avroRecord);
+ }
+ }
+
+ try (ParquetFileReader reader =
+
ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) {
+
assertThat(ParquetSchemaUtil.hasIds(reader.getFooter().getFileMetaData().getSchema()))
+ .isFalse();
+ }
+ }
+
+ private void writeOrcWithoutFieldIds(Schema schema, List<Record> records)
throws IOException {
+ TypeDescription typeWithIds = ORCSchemaUtil.convert(schema);
+ TypeDescription typeWithoutIds = TestORCSchemaUtil.removeIds(typeWithIds);
+
+ OutputFile outputFile = encryptedFile.encryptingOutputFile();
+ Path hadoopPath = new Path(outputFile.location());
+
+ Configuration conf = new Configuration();
+ OrcFile.WriterOptions options =
+ OrcFile.writerOptions(conf)
+ .useUTCTimestamp(true)
+ .setSchema(typeWithoutIds)
+ .fileSystem(OrcWritingTestUtils.outputFileSystem(outputFile));
+
+ OrcRowWriter<Record> rowWriter = GenericOrcWriter.buildWriter(schema,
typeWithIds);
+
+ try (Writer orcWriter = OrcFile.createWriter(hadoopPath, options)) {
+ VectorizedRowBatch batch = typeWithoutIds.createRowBatch();
+ for (Record record : records) {
+ rowWriter.write(record, batch);
+ if (batch.size == batch.getMaxSize()) {
+ orcWriter.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+
+ if (batch.size > 0) {
+ orcWriter.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+
+ InputFile inputFile = outputFile.toInputFile();
+ OrcFile.ReaderOptions readerOptions =
+ OrcFile.readerOptions(conf)
+ .useUTCTimestamp(true)
+ .filesystem(OrcWritingTestUtils.inputFileSystem(inputFile))
+ .maxLength(inputFile.getLength());
+
+ try (Reader reader = OrcFile.createReader(hadoopPath, readerOptions)) {
+ assertThat(TestORCSchemaUtil.hasIds(reader.getSchema())).isFalse();
+ }
+ }
+
+ private void runTypePromotionCheck(
+ FileFormat fileFormat, Type fromType, Type toType, Function<Object,
Object> promoteValue)
+ throws IOException {
+ String columnName = "col";
+ Schema writeSchema = new Schema(Types.NestedField.required(1, columnName,
fromType));
+ Schema readSchema = new Schema(Types.NestedField.required(1, columnName,
toType));
+
+ List<Record> genericRecords = RandomGenericData.generate(writeSchema, 10,
1L);
+ writeGenericRecords(fileFormat, writeSchema, genericRecords);
+
+ readAndAssertEngineRecords(
+ fileFormat,
+ readSchema,
+ genericRecords,
+ record -> {
+ Record expected = GenericRecord.create(readSchema);
+ expected.setField(columnName,
promoteValue.apply(record.getField(columnName)));
+ return expected;
+ });
+ }
+
+ private void readAndAssertEngineRecords(
+ FileFormat fileFormat,
+ Schema readSchema,
+ List<Record> sourceRecords,
+ Function<Record, Record> converter)
+ throws IOException {
+ List<Record> expectedGenericRecords =
sourceRecords.stream().map(converter).toList();
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<T> readRecords;
+ try (CloseableIterable<T> reader =
+ FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+ .project(readSchema)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ assertThat(readRecords).hasSize(expectedGenericRecords.size());
+ assertEquals(
+ readSchema, convertToEngineRecords(expectedGenericRecords,
readSchema), readRecords);
+ }
}
diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle
index 467b0fa8c9..41f2489c80 100644
--- a/flink/v1.20/build.gradle
+++ b/flink/v1.20/build.gradle
@@ -84,6 +84,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-orc', configuration:
'testArtifacts')
+ testImplementation(testFixtures(project(':iceberg-parquet')))
// By default, hive-exec is a fat/uber jar and it exports a guava library
// that's really old. We use the core classifier to be able to override
our guava
diff --git a/flink/v2.0/build.gradle b/flink/v2.0/build.gradle
index f80a312421..7bc37b30e5 100644
--- a/flink/v2.0/build.gradle
+++ b/flink/v2.0/build.gradle
@@ -84,6 +84,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-orc', configuration:
'testArtifacts')
+ testImplementation(testFixtures(project(':iceberg-parquet')))
// By default, hive-exec is a fat/uber jar and it exports a guava library
// that's really old. We use the core classifier to be able to override
our guava
diff --git a/flink/v2.1/build.gradle b/flink/v2.1/build.gradle
index 451f144147..f93b61646e 100644
--- a/flink/v2.1/build.gradle
+++ b/flink/v2.1/build.gradle
@@ -84,6 +84,8 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-orc', configuration:
'testArtifacts')
+ testImplementation(testFixtures(project(':iceberg-parquet')))
// By default, hive-exec is a fat/uber jar and it exports a guava library
// that's really old. We use the core classifier to be able to override
our guava
diff --git a/orc/src/test/java/org/apache/iceberg/orc/OrcWritingTestUtils.java
b/orc/src/test/java/org/apache/iceberg/orc/OrcWritingTestUtils.java
new file mode 100644
index 0000000000..72ed03ce2c
--- /dev/null
+++ b/orc/src/test/java/org/apache/iceberg/orc/OrcWritingTestUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.orc;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+
+public class OrcWritingTestUtils {
+ private OrcWritingTestUtils() {}
+
+ public static FileSystem outputFileSystem(OutputFile file) {
+ return new FileIOFSUtil.OutputFileSystem(file);
+ }
+
+ public static FileSystem inputFileSystem(InputFile file) {
+ return new FileIOFSUtil.InputFileSystem(file);
+ }
+}
diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
index c19e36be3a..e331ca94a2 100644
--- a/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
+++ b/orc/src/test/java/org/apache/iceberg/orc/TestORCSchemaUtil.java
@@ -560,4 +560,12 @@ public class TestORCSchemaUtil {
return true;
}
+
+ public static TypeDescription removeIds(TypeDescription type) {
+ return ORCSchemaUtil.removeIds(type);
+ }
+
+ public static boolean hasIds(TypeDescription orcSchema) {
+ return ORCSchemaUtil.hasIds(orcSchema);
+ }
}
diff --git
a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
index b8cd38f56d..441073d34a 100644
---
a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
+++
b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
@@ -35,7 +35,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.parquet.schema.MessageType;
/** Utilities for tests that need to write Parquet files. */
-class ParquetWritingTestUtils {
+public class ParquetWritingTestUtils {
private ParquetWritingTestUtils() {}
diff --git
a/parquet/src/testFixtures/java/org/apache/iceberg/parquet/ParquetFileTestUtils.java
b/parquet/src/testFixtures/java/org/apache/iceberg/parquet/ParquetFileTestUtils.java
new file mode 100644
index 0000000000..a6055424c0
--- /dev/null
+++
b/parquet/src/testFixtures/java/org/apache/iceberg/parquet/ParquetFileTestUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+
+/** Utilities for tests that need to write Parquet files. */
+public class ParquetFileTestUtils {
+
+ private ParquetFileTestUtils() {}
+
+ public static OutputFile file(org.apache.iceberg.io.OutputFile file) {
+ return ParquetIO.file(file);
+ }
+
+ public static InputFile file(org.apache.iceberg.io.InputFile file) {
+ return ParquetIO.file(file);
+ }
+}
diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle
index ead4a32f49..57e4853177 100644
--- a/spark/v3.4/build.gradle
+++ b/spark/v3.4/build.gradle
@@ -105,8 +105,10 @@
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-orc', configuration:
'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration:
'testFixturesRuntimeElements'))
testImplementation libs.awaitility
+ testImplementation(testFixtures(project(':iceberg-parquet')))
}
test {
diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
index a69b78e5ad..68bdb1c21a 100644
--- a/spark/v3.5/build.gradle
+++ b/spark/v3.5/build.gradle
@@ -105,6 +105,7 @@
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-orc', configuration:
'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration:
'testFixturesRuntimeElements'))
testImplementation libs.awaitility
testImplementation(testFixtures(project(':iceberg-parquet')))
diff --git a/spark/v4.0/build.gradle b/spark/v4.0/build.gradle
index ba2e0fd4ba..3707e01e48 100644
--- a/spark/v4.0/build.gradle
+++ b/spark/v4.0/build.gradle
@@ -105,6 +105,7 @@
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-orc', configuration:
'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration:
'testFixturesRuntimeElements'))
testImplementation libs.awaitility
testImplementation(testFixtures(project(':iceberg-parquet')))
diff --git a/spark/v4.1/build.gradle b/spark/v4.1/build.gradle
index 02e4323e70..e6455fa34f 100644
--- a/spark/v4.1/build.gradle
+++ b/spark/v4.1/build.gradle
@@ -105,6 +105,7 @@
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation project(path: ':iceberg-api', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration:
'testArtifacts')
testImplementation project(path: ':iceberg-data', configuration:
'testArtifacts')
+ testImplementation project(path: ':iceberg-orc', configuration:
'testArtifacts')
testImplementation (project(path: ':iceberg-open-api', configuration:
'testFixturesRuntimeElements'))
testImplementation libs.awaitility
testImplementation(testFixtures(project(':iceberg-parquet')))