This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b4eddbc155 Data, Spark, Flink: Add TCK for File Format API (#15441)
b4eddbc155 is described below
commit b4eddbc1557a91698473bad6fee7f58c6edd7708
Author: Joy Haldar <[email protected]>
AuthorDate: Fri Mar 13 16:28:41 2026 +0530
Data, Spark, Flink: Add TCK for File Format API (#15441)
---
.../apache/iceberg/data/BaseFormatModelTests.java | 323 +++++++++++++++++++++
.../org/apache/iceberg/data/DataGenerator.java | 34 +++
.../org/apache/iceberg/data/DataGenerators.java | 52 ++++
.../iceberg/data/TestGenericFormatModels.java | 205 -------------
.../iceberg/flink/data/TestFlinkFormatModel.java | 51 ++++
.../iceberg/spark/data/InternalRowConverter.java | 115 ++++++++
.../iceberg/spark/data/TestSparkFormatModel.java | 54 ++++
7 files changed, 629 insertions(+), 205 deletions(-)
diff --git
a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
new file mode 100644
index 0000000000..e9a7f5e0c3
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.FieldSource;
+
+public abstract class BaseFormatModelTests<T> {
+
+ protected abstract Class<T> engineType();
+
+ protected abstract Object engineSchema(Schema schema);
+
+ protected abstract T convertToEngine(Record record, Schema schema);
+
+ protected abstract void assertEquals(Schema schema, List<T> expected,
List<T> actual);
+
+ private static final FileFormat[] FILE_FORMATS =
+ new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
+
+ private static final List<Arguments> FORMAT_AND_GENERATOR =
+ Arrays.stream(FILE_FORMATS)
+ .flatMap(
+ format ->
+ Arrays.stream(DataGenerators.ALL)
+ .map(generator -> Arguments.of(format, generator)))
+ .toList();
+
+ @TempDir protected Path temp;
+
+ private InMemoryFileIO fileIO;
+ private EncryptedOutputFile encryptedFile;
+
+ @BeforeEach
+ void before() {
+ this.fileIO = new InMemoryFileIO();
+ this.encryptedFile =
+ EncryptedFiles.encryptedOutput(
+ fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY);
+ }
+
+ @AfterEach
+ void after() {
+ fileIO.deleteFile(encryptedFile.encryptingOutputFile());
+ this.encryptedFile = null;
+ if (fileIO != null) {
+ fileIO.close();
+ }
+ }
+
+ @ParameterizedTest
+ @FieldSource("FORMAT_AND_GENERATOR")
+ /** Write with engine type T, read with Generic Record */
+ void testDataWriterEngineWriteGenericRead(FileFormat fileFormat,
DataGenerator dataGenerator)
+ throws IOException {
+ Schema schema = dataGenerator.schema();
+ FileWriterBuilder<DataWriter<T>, Object> writerBuilder =
+ FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(),
encryptedFile);
+
+ DataWriter<T> writer =
+ writerBuilder
+ .schema(schema)
+ .engineSchema(engineSchema(schema))
+ .spec(PartitionSpec.unpartitioned())
+ .build();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ List<T> engineRecords = convertToEngineRecords(genericRecords, schema);
+
+ try (writer) {
+ for (T record : engineRecords) {
+ writer.write(record);
+ }
+ }
+
+ DataFile dataFile = writer.toDataFile();
+
+ assertThat(dataFile).isNotNull();
+ assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size());
+ assertThat(dataFile.format()).isEqualTo(fileFormat);
+
+ // Read back and verify
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<Record> readRecords;
+ try (CloseableIterable<Record> reader =
+ FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+ .project(schema)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ DataTestHelpers.assertEquals(schema.asStruct(), genericRecords,
readRecords);
+ }
+
+ @ParameterizedTest
+ @FieldSource("FORMAT_AND_GENERATOR")
+ /** Write with Generic Record, read with engine type T */
+ void testDataWriterGenericWriteEngineRead(FileFormat fileFormat,
DataGenerator dataGenerator)
+ throws IOException {
+ Schema schema = dataGenerator.schema();
+ FileWriterBuilder<DataWriter<Record>, Object> writerBuilder =
+ FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class,
encryptedFile);
+
+ DataWriter<Record> writer =
+
writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+
+ try (writer) {
+ for (Record record : genericRecords) {
+ writer.write(record);
+ }
+ }
+
+ DataFile dataFile = writer.toDataFile();
+
+ assertThat(dataFile).isNotNull();
+ assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size());
+ assertThat(dataFile.format()).isEqualTo(fileFormat);
+
+ // Read back and verify
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<T> readRecords;
+ try (CloseableIterable<T> reader =
+ FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+ .project(schema)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ assertEquals(schema, convertToEngineRecords(genericRecords, schema),
readRecords);
+ }
+
+ @ParameterizedTest
+ @FieldSource("FORMAT_AND_GENERATOR")
+ /** Write with engine type T, read with Generic Record */
+ void testEqualityDeleteWriterEngineWriteGenericRead(
+ FileFormat fileFormat, DataGenerator dataGenerator) throws IOException {
+ Schema schema = dataGenerator.schema();
+ FileWriterBuilder<EqualityDeleteWriter<T>, Object> writerBuilder =
+ FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat,
engineType(), encryptedFile);
+
+ EqualityDeleteWriter<T> writer =
+ writerBuilder
+ .schema(schema)
+ .engineSchema(engineSchema(schema))
+ .spec(PartitionSpec.unpartitioned())
+ .equalityFieldIds(1)
+ .build();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+ List<T> engineRecords = convertToEngineRecords(genericRecords, schema);
+
+ try (writer) {
+ for (T record : engineRecords) {
+ writer.write(record);
+ }
+ }
+
+ DeleteFile deleteFile = writer.toDeleteFile();
+
+ assertThat(deleteFile).isNotNull();
+ assertThat(deleteFile.recordCount()).isEqualTo(engineRecords.size());
+ assertThat(deleteFile.format()).isEqualTo(fileFormat);
+ assertThat(deleteFile.equalityFieldIds()).containsExactly(1);
+
+ // Read back and verify
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<Record> readRecords;
+ try (CloseableIterable<Record> reader =
+ FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+ .project(schema)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ DataTestHelpers.assertEquals(schema.asStruct(), genericRecords,
readRecords);
+ }
+
+ @ParameterizedTest
+ @FieldSource("FORMAT_AND_GENERATOR")
+ /** Write with Generic Record, read with engine type T */
+ void testEqualityDeleteWriterGenericWriteEngineRead(
+ FileFormat fileFormat, DataGenerator dataGenerator) throws IOException {
+ Schema schema = dataGenerator.schema();
+ FileWriterBuilder<EqualityDeleteWriter<Record>, Object> writerBuilder =
+ FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat,
Record.class, encryptedFile);
+
+ EqualityDeleteWriter<Record> writer =
+ writerBuilder
+ .schema(schema)
+ .spec(PartitionSpec.unpartitioned())
+ .equalityFieldIds(1)
+ .build();
+
+ List<Record> genericRecords = dataGenerator.generateRecords();
+
+ try (writer) {
+ for (Record record : genericRecords) {
+ writer.write(record);
+ }
+ }
+
+ DeleteFile deleteFile = writer.toDeleteFile();
+
+ assertThat(deleteFile).isNotNull();
+ assertThat(deleteFile.recordCount()).isEqualTo(genericRecords.size());
+ assertThat(deleteFile.format()).isEqualTo(fileFormat);
+ assertThat(deleteFile.equalityFieldIds()).containsExactly(1);
+
+ // Read back and verify
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<T> readRecords;
+ try (CloseableIterable<T> reader =
+ FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+ .project(schema)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ assertEquals(schema, convertToEngineRecords(genericRecords, schema),
readRecords);
+ }
+
+ @ParameterizedTest
+ @FieldSource("FILE_FORMATS")
+ /** Write position deletes, read with Generic Record */
+ void testPositionDeleteWriterEngineWriteGenericRead(FileFormat fileFormat)
throws IOException {
+ Schema positionDeleteSchema = DeleteSchemaUtil.pathPosSchema();
+
+ FileWriterBuilder<PositionDeleteWriter<T>, ?> writerBuilder =
+ FormatModelRegistry.positionDeleteWriteBuilder(fileFormat,
encryptedFile);
+
+ List<PositionDelete<T>> deletes =
+ ImmutableList.of(
+ PositionDelete.<T>create().set("data-file-1.parquet", 0L),
+ PositionDelete.<T>create().set("data-file-1.parquet", 1L));
+
+ List<Record> records =
+ deletes.stream()
+ .map(
+ d ->
+ GenericRecord.create(positionDeleteSchema)
+ .copy(DELETE_FILE_PATH.name(), d.path(),
DELETE_FILE_POS.name(), d.pos()))
+ .toList();
+
+ PositionDeleteWriter<T> writer =
writerBuilder.spec(PartitionSpec.unpartitioned()).build();
+ try (writer) {
+ for (PositionDelete<T> delete : deletes) {
+ writer.write(delete);
+ }
+ }
+
+ DeleteFile deleteFile = writer.toDeleteFile();
+
+ assertThat(deleteFile).isNotNull();
+ assertThat(deleteFile.recordCount()).isEqualTo(2);
+ assertThat(deleteFile.format()).isEqualTo(fileFormat);
+
+ // Read back and verify
+ InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+ List<Record> readRecords;
+ try (CloseableIterable<Record> reader =
+ FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+ .project(positionDeleteSchema)
+ .build()) {
+ readRecords = ImmutableList.copyOf(reader);
+ }
+
+ DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), records,
readRecords);
+ }
+
+ private List<T> convertToEngineRecords(List<Record> records, Schema schema) {
+ return records.stream().map(r -> convertToEngine(r,
schema)).collect(Collectors.toList());
+ }
+}
diff --git a/data/src/test/java/org/apache/iceberg/data/DataGenerator.java
b/data/src/test/java/org/apache/iceberg/data/DataGenerator.java
new file mode 100644
index 0000000000..4fa6bb04e1
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/DataGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+
+/** Interface for generating test data with different schema types. */
+interface DataGenerator {
+
+ /** Returns the Iceberg schema for this test data. */
+ Schema schema();
+
+ /** Generates test records using RandomGenericData. */
+ default List<Record> generateRecords() {
+ return RandomGenericData.generate(schema(), 10, 1L);
+ }
+}
diff --git a/data/src/test/java/org/apache/iceberg/data/DataGenerators.java
b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java
new file mode 100644
index 0000000000..ef1caad7f7
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Test data generators for different schema types. Add new generators to ALL
array to include them
+ * in format model tests.
+ */
+class DataGenerators {
+
+ static final DataGenerator[] ALL = new DataGenerator[] {new
StructOfPrimitive()};
+
+ private DataGenerators() {}
+
+ static class StructOfPrimitive implements DataGenerator {
+ private final Schema schema =
+ new Schema(
+ Types.NestedField.required(1, "row_id", Types.StringType.get()),
+ Types.NestedField.required(
+ 2,
+ "struct_of_primitive",
+ Types.StructType.of(
+ required(101, "id", Types.IntegerType.get()),
+ required(102, "name", Types.StringType.get()))));
+
+ @Override
+ public Schema schema() {
+ return schema;
+ }
+ }
+}
diff --git
a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
deleted file mode 100644
index d2bf1cb3e8..0000000000
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.data;
-
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.List;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.TestBase;
-import org.apache.iceberg.deletes.EqualityDeleteWriter;
-import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.deletes.PositionDeleteWriter;
-import org.apache.iceberg.encryption.EncryptedFiles;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
-import org.apache.iceberg.encryption.EncryptionKeyMetadata;
-import org.apache.iceberg.formats.FileWriterBuilder;
-import org.apache.iceberg.formats.FormatModelRegistry;
-import org.apache.iceberg.inmemory.InMemoryFileIO;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.FieldSource;
-
-public class TestGenericFormatModels {
- private static final List<Record> TEST_RECORDS =
- RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
-
- private static final FileFormat[] FILE_FORMATS =
- new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
-
- @TempDir protected Path temp;
-
- private InMemoryFileIO fileIO;
- private EncryptedOutputFile encryptedFile;
-
- @BeforeEach
- public void before() {
- this.fileIO = new InMemoryFileIO();
- this.encryptedFile =
- EncryptedFiles.encryptedOutput(
- fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY);
- }
-
- @AfterEach
- public void after() throws IOException {
- fileIO.deleteFile(encryptedFile.encryptingOutputFile());
- this.encryptedFile = null;
- if (fileIO != null) {
- fileIO.close();
- }
- }
-
- @ParameterizedTest
- @FieldSource("FILE_FORMATS")
- public void testDataWriterRoundTrip(FileFormat fileFormat) throws
IOException {
- FileWriterBuilder<DataWriter<Record>, Schema> writerBuilder =
- FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class,
encryptedFile);
-
- DataFile dataFile;
- DataWriter<Record> writer =
-
writerBuilder.schema(TestBase.SCHEMA).spec(PartitionSpec.unpartitioned()).build();
- try (writer) {
- for (Record record : TEST_RECORDS) {
- writer.write(record);
- }
- }
-
- dataFile = writer.toDataFile();
-
- assertThat(dataFile).isNotNull();
- assertThat(dataFile.recordCount()).isEqualTo(TEST_RECORDS.size());
- assertThat(dataFile.format()).isEqualTo(fileFormat);
-
- // Verify the file content by reading it back
- InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
- List<Record> readRecords;
- try (CloseableIterable<Record> reader =
- FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
- .project(TestBase.SCHEMA)
- .reuseContainers()
- .build()) {
- readRecords = ImmutableList.copyOf(CloseableIterable.transform(reader,
Record::copy));
- }
-
- DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS,
readRecords);
- }
-
- @ParameterizedTest
- @FieldSource("FILE_FORMATS")
- public void testEqualityDeleteWriterRoundTrip(FileFormat fileFormat) throws
IOException {
- FileWriterBuilder<EqualityDeleteWriter<Record>, Schema> writerBuilder =
- FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat,
Record.class, encryptedFile);
-
- DeleteFile deleteFile;
- EqualityDeleteWriter<Record> writer =
- writerBuilder
- .schema(TestBase.SCHEMA)
- .spec(PartitionSpec.unpartitioned())
- .equalityFieldIds(3)
- .build();
- try (writer) {
- for (Record record : TEST_RECORDS) {
- writer.write(record);
- }
- }
-
- deleteFile = writer.toDeleteFile();
-
- assertThat(deleteFile).isNotNull();
- assertThat(deleteFile.recordCount()).isEqualTo(TEST_RECORDS.size());
- assertThat(deleteFile.format()).isEqualTo(fileFormat);
- assertThat(deleteFile.equalityFieldIds()).containsExactly(3);
-
- // Verify the file content by reading it back
- InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
- List<Record> readRecords;
- try (CloseableIterable<Record> reader =
- FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
- .project(TestBase.SCHEMA)
- .build()) {
- readRecords = ImmutableList.copyOf(reader);
- }
-
- DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS,
readRecords);
- }
-
- @ParameterizedTest
- @FieldSource("FILE_FORMATS")
- public void testPositionDeleteWriterRoundTrip(FileFormat fileFormat) throws
IOException {
- Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH,
DELETE_FILE_POS);
-
- FileWriterBuilder<PositionDeleteWriter<Record>, ?> writerBuilder =
- FormatModelRegistry.positionDeleteWriteBuilder(fileFormat,
encryptedFile);
-
- PositionDelete<Record> delete1 = PositionDelete.create();
- delete1.set("data-file-1.parquet", 0L);
-
- PositionDelete<Record> delete2 = PositionDelete.create();
- delete2.set("data-file-1.parquet", 1L);
-
- List<PositionDelete<Record>> positionDeletes = ImmutableList.of(delete1,
delete2);
-
- DeleteFile deleteFile;
- PositionDeleteWriter<Record> writer =
writerBuilder.spec(PartitionSpec.unpartitioned()).build();
- try (writer) {
- for (PositionDelete<Record> delete : positionDeletes) {
- writer.write(delete);
- }
- }
-
- deleteFile = writer.toDeleteFile();
-
- assertThat(deleteFile).isNotNull();
- assertThat(deleteFile.recordCount()).isEqualTo(2);
- assertThat(deleteFile.format()).isEqualTo(fileFormat);
-
- // Verify the file content by reading it back
- InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
- List<Record> readRecords;
- try (CloseableIterable<Record> reader =
- FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
- .project(positionDeleteSchema)
- .build()) {
- readRecords = ImmutableList.copyOf(reader);
- }
-
- List<Record> expected =
- ImmutableList.of(
- GenericRecord.create(positionDeleteSchema)
- .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet",
DELETE_FILE_POS.name(), 0L),
- GenericRecord.create(positionDeleteSchema)
- .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet",
DELETE_FILE_POS.name(), 1L));
-
- DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), expected,
readRecords);
- }
-}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
new file mode 100644
index 0000000000..8c99fdf521
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestHelpers;
+
+public class TestFlinkFormatModel extends BaseFormatModelTests<RowData> {
+
+ @Override
+ protected Class<RowData> engineType() {
+ return RowData.class;
+ }
+
+ @Override
+ protected Object engineSchema(Schema schema) {
+ return FlinkSchemaUtil.convert(schema);
+ }
+
+ @Override
+ protected RowData convertToEngine(Record record, Schema schema) {
+ return RowDataConverter.convert(schema, record);
+ }
+
+ @Override
+ protected void assertEquals(Schema schema, List<RowData> expected,
List<RowData> actual) {
+ TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema));
+ }
+}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
new file mode 100644
index 0000000000..b7ba7a3430
--- /dev/null
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/** Converts Iceberg Record to Spark InternalRow for testing. */
+public class InternalRowConverter {
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+ private InternalRowConverter() {}
+
+ public static InternalRow convert(Schema schema, Record record) {
+ return convert(schema.asStruct(), record);
+ }
+
+ private static InternalRow convert(Types.StructType struct, Record record) {
+ GenericInternalRow internalRow = new
GenericInternalRow(struct.fields().size());
+ List<Types.NestedField> fields = struct.fields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Types.NestedField field = fields.get(i);
+
+ Type fieldType = field.type();
+ internalRow.update(i, convert(fieldType, record.get(i)));
+ }
+
+ return internalRow;
+ }
+
+ private static Object convert(Type type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return switch (type.typeId()) {
+ case BOOLEAN, INTEGER, LONG, FLOAT, DOUBLE -> value;
+ case DATE -> (int) ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) value);
+ case TIMESTAMP ->
+ ((Types.TimestampType) type).shouldAdjustToUTC()
+ ? ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) value)
+ : ChronoUnit.MICROS.between(EPOCH, ((LocalDateTime)
value).atZone(ZoneId.of("UTC")));
+ case STRING -> UTF8String.fromString((String) value);
+ case UUID -> UTF8String.fromString(value.toString());
+ case FIXED, BINARY -> {
+ ByteBuffer buffer = (ByteBuffer) value;
+ yield Arrays.copyOfRange(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.arrayOffset() + buffer.remaining());
+ }
+ case DECIMAL -> Decimal.apply((BigDecimal) value);
+ case STRUCT -> convert((Types.StructType) type, (Record) value);
+ case LIST ->
+ new GenericArrayData(
+ ((List<?>) value)
+ .stream()
+ .map(element -> convert(type.asListType().elementType(),
element))
+ .toArray());
+ case MAP ->
+ new ArrayBasedMapData(
+ new GenericArrayData(
+ ((Map<?, ?>) value)
+ .keySet().stream()
+ .map(o -> convert(type.asMapType().keyType(), o))
+ .toArray()),
+ new GenericArrayData(
+ ((Map<?, ?>) value)
+ .values().stream()
+ .map(o -> convert(type.asMapType().valueType(), o))
+ .toArray()));
+ // TIME is not supported by Spark, VARIANT not yet implemented
+ default ->
+ throw new UnsupportedOperationException(
+ "Unsupported type for conversion to InternalRow: " + type);
+ };
+ }
+}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
new file mode 100644
index 0000000000..c18e4c053f
--- /dev/null
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class TestSparkFormatModel extends BaseFormatModelTests<InternalRow> {
+
+ @Override
+ protected Class<InternalRow> engineType() {
+ return InternalRow.class;
+ }
+
+ @Override
+ protected Object engineSchema(Schema schema) {
+ return SparkSchemaUtil.convert(schema);
+ }
+
+ @Override
+ protected InternalRow convertToEngine(Record record, Schema schema) {
+ return InternalRowConverter.convert(schema, record);
+ }
+
+ @Override
+ protected void assertEquals(Schema schema, List<InternalRow> expected,
List<InternalRow> actual) {
+ assertThat(actual).hasSameSizeAs(expected);
+ for (int i = 0; i < expected.size(); i++) {
+ TestHelpers.assertEquals(schema, expected.get(i), actual.get(i));
+ }
+ }
+}