This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b4eddbc155 Data, Spark, Flink: Add TCK for File Format API (#15441)
b4eddbc155 is described below

commit b4eddbc1557a91698473bad6fee7f58c6edd7708
Author: Joy Haldar <[email protected]>
AuthorDate: Fri Mar 13 16:28:41 2026 +0530

    Data, Spark, Flink: Add TCK for File Format API (#15441)
---
 .../apache/iceberg/data/BaseFormatModelTests.java  | 323 +++++++++++++++++++++
 .../org/apache/iceberg/data/DataGenerator.java     |  34 +++
 .../org/apache/iceberg/data/DataGenerators.java    |  52 ++++
 .../iceberg/data/TestGenericFormatModels.java      | 205 -------------
 .../iceberg/flink/data/TestFlinkFormatModel.java   |  51 ++++
 .../iceberg/spark/data/InternalRowConverter.java   | 115 ++++++++
 .../iceberg/spark/data/TestSparkFormatModel.java   |  54 ++++
 7 files changed, 629 insertions(+), 205 deletions(-)

diff --git 
a/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java 
b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
new file mode 100644
index 0000000000..e9a7f5e0c3
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
+import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.formats.FileWriterBuilder;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.FieldSource;
+
+public abstract class BaseFormatModelTests<T> {
+
+  protected abstract Class<T> engineType();
+
+  protected abstract Object engineSchema(Schema schema);
+
+  protected abstract T convertToEngine(Record record, Schema schema);
+
+  protected abstract void assertEquals(Schema schema, List<T> expected, 
List<T> actual);
+
+  private static final FileFormat[] FILE_FORMATS =
+      new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
+
+  private static final List<Arguments> FORMAT_AND_GENERATOR =
+      Arrays.stream(FILE_FORMATS)
+          .flatMap(
+              format ->
+                  Arrays.stream(DataGenerators.ALL)
+                      .map(generator -> Arguments.of(format, generator)))
+          .toList();
+
+  @TempDir protected Path temp;
+
+  private InMemoryFileIO fileIO;
+  private EncryptedOutputFile encryptedFile;
+
+  @BeforeEach
+  void before() {
+    this.fileIO = new InMemoryFileIO();
+    this.encryptedFile =
+        EncryptedFiles.encryptedOutput(
+            fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY);
+  }
+
+  @AfterEach
+  void after() {
+    fileIO.deleteFile(encryptedFile.encryptingOutputFile());
+    this.encryptedFile = null;
+    if (fileIO != null) {
+      fileIO.close();
+    }
+  }
+
+  @ParameterizedTest
+  @FieldSource("FORMAT_AND_GENERATOR")
+  /** Write with engine type T, read with Generic Record */
+  void testDataWriterEngineWriteGenericRead(FileFormat fileFormat, 
DataGenerator dataGenerator)
+      throws IOException {
+    Schema schema = dataGenerator.schema();
+    FileWriterBuilder<DataWriter<T>, Object> writerBuilder =
+        FormatModelRegistry.dataWriteBuilder(fileFormat, engineType(), 
encryptedFile);
+
+    DataWriter<T> writer =
+        writerBuilder
+            .schema(schema)
+            .engineSchema(engineSchema(schema))
+            .spec(PartitionSpec.unpartitioned())
+            .build();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    List<T> engineRecords = convertToEngineRecords(genericRecords, schema);
+
+    try (writer) {
+      for (T record : engineRecords) {
+        writer.write(record);
+      }
+    }
+
+    DataFile dataFile = writer.toDataFile();
+
+    assertThat(dataFile).isNotNull();
+    assertThat(dataFile.recordCount()).isEqualTo(engineRecords.size());
+    assertThat(dataFile.format()).isEqualTo(fileFormat);
+
+    // Read back and verify
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<Record> readRecords;
+    try (CloseableIterable<Record> reader =
+        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+            .project(schema)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    DataTestHelpers.assertEquals(schema.asStruct(), genericRecords, 
readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FORMAT_AND_GENERATOR")
+  /** Write with Generic Record, read with engine type T */
+  void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, 
DataGenerator dataGenerator)
+      throws IOException {
+    Schema schema = dataGenerator.schema();
+    FileWriterBuilder<DataWriter<Record>, Object> writerBuilder =
+        FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, 
encryptedFile);
+
+    DataWriter<Record> writer =
+        
writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+
+    try (writer) {
+      for (Record record : genericRecords) {
+        writer.write(record);
+      }
+    }
+
+    DataFile dataFile = writer.toDataFile();
+
+    assertThat(dataFile).isNotNull();
+    assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size());
+    assertThat(dataFile.format()).isEqualTo(fileFormat);
+
+    // Read back and verify
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(schema)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    assertEquals(schema, convertToEngineRecords(genericRecords, schema), 
readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FORMAT_AND_GENERATOR")
+  /** Write with engine type T, read with Generic Record */
+  void testEqualityDeleteWriterEngineWriteGenericRead(
+      FileFormat fileFormat, DataGenerator dataGenerator) throws IOException {
+    Schema schema = dataGenerator.schema();
+    FileWriterBuilder<EqualityDeleteWriter<T>, Object> writerBuilder =
+        FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, 
engineType(), encryptedFile);
+
+    EqualityDeleteWriter<T> writer =
+        writerBuilder
+            .schema(schema)
+            .engineSchema(engineSchema(schema))
+            .spec(PartitionSpec.unpartitioned())
+            .equalityFieldIds(1)
+            .build();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+    List<T> engineRecords = convertToEngineRecords(genericRecords, schema);
+
+    try (writer) {
+      for (T record : engineRecords) {
+        writer.write(record);
+      }
+    }
+
+    DeleteFile deleteFile = writer.toDeleteFile();
+
+    assertThat(deleteFile).isNotNull();
+    assertThat(deleteFile.recordCount()).isEqualTo(engineRecords.size());
+    assertThat(deleteFile.format()).isEqualTo(fileFormat);
+    assertThat(deleteFile.equalityFieldIds()).containsExactly(1);
+
+    // Read back and verify
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<Record> readRecords;
+    try (CloseableIterable<Record> reader =
+        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+            .project(schema)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    DataTestHelpers.assertEquals(schema.asStruct(), genericRecords, 
readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FORMAT_AND_GENERATOR")
+  /** Write with Generic Record, read with engine type T */
+  void testEqualityDeleteWriterGenericWriteEngineRead(
+      FileFormat fileFormat, DataGenerator dataGenerator) throws IOException {
+    Schema schema = dataGenerator.schema();
+    FileWriterBuilder<EqualityDeleteWriter<Record>, Object> writerBuilder =
+        FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, 
Record.class, encryptedFile);
+
+    EqualityDeleteWriter<Record> writer =
+        writerBuilder
+            .schema(schema)
+            .spec(PartitionSpec.unpartitioned())
+            .equalityFieldIds(1)
+            .build();
+
+    List<Record> genericRecords = dataGenerator.generateRecords();
+
+    try (writer) {
+      for (Record record : genericRecords) {
+        writer.write(record);
+      }
+    }
+
+    DeleteFile deleteFile = writer.toDeleteFile();
+
+    assertThat(deleteFile).isNotNull();
+    assertThat(deleteFile.recordCount()).isEqualTo(genericRecords.size());
+    assertThat(deleteFile.format()).isEqualTo(fileFormat);
+    assertThat(deleteFile.equalityFieldIds()).containsExactly(1);
+
+    // Read back and verify
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<T> readRecords;
+    try (CloseableIterable<T> reader =
+        FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
+            .project(schema)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    assertEquals(schema, convertToEngineRecords(genericRecords, schema), 
readRecords);
+  }
+
+  @ParameterizedTest
+  @FieldSource("FILE_FORMATS")
+  /** Write position deletes, read with Generic Record */
+  void testPositionDeleteWriterEngineWriteGenericRead(FileFormat fileFormat) 
throws IOException {
+    Schema positionDeleteSchema = DeleteSchemaUtil.pathPosSchema();
+
+    FileWriterBuilder<PositionDeleteWriter<T>, ?> writerBuilder =
+        FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, 
encryptedFile);
+
+    List<PositionDelete<T>> deletes =
+        ImmutableList.of(
+            PositionDelete.<T>create().set("data-file-1.parquet", 0L),
+            PositionDelete.<T>create().set("data-file-1.parquet", 1L));
+
+    List<Record> records =
+        deletes.stream()
+            .map(
+                d ->
+                    GenericRecord.create(positionDeleteSchema)
+                        .copy(DELETE_FILE_PATH.name(), d.path(), 
DELETE_FILE_POS.name(), d.pos()))
+            .toList();
+
+    PositionDeleteWriter<T> writer = 
writerBuilder.spec(PartitionSpec.unpartitioned()).build();
+    try (writer) {
+      for (PositionDelete<T> delete : deletes) {
+        writer.write(delete);
+      }
+    }
+
+    DeleteFile deleteFile = writer.toDeleteFile();
+
+    assertThat(deleteFile).isNotNull();
+    assertThat(deleteFile.recordCount()).isEqualTo(2);
+    assertThat(deleteFile.format()).isEqualTo(fileFormat);
+
+    // Read back and verify
+    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
+    List<Record> readRecords;
+    try (CloseableIterable<Record> reader =
+        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
+            .project(positionDeleteSchema)
+            .build()) {
+      readRecords = ImmutableList.copyOf(reader);
+    }
+
+    DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), records, 
readRecords);
+  }
+
+  private List<T> convertToEngineRecords(List<Record> records, Schema schema) {
+    return records.stream().map(r -> convertToEngine(r, 
schema)).collect(Collectors.toList());
+  }
+}
diff --git a/data/src/test/java/org/apache/iceberg/data/DataGenerator.java 
b/data/src/test/java/org/apache/iceberg/data/DataGenerator.java
new file mode 100644
index 0000000000..4fa6bb04e1
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/DataGenerator.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+
+/** Interface for generating test data with different schema types. */
+interface DataGenerator {
+
+  /** Returns the Iceberg schema for this test data. */
+  Schema schema();
+
+  /** Generates test records using RandomGenericData. */
+  default List<Record> generateRecords() {
+    return RandomGenericData.generate(schema(), 10, 1L);
+  }
+}
diff --git a/data/src/test/java/org/apache/iceberg/data/DataGenerators.java 
b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java
new file mode 100644
index 0000000000..ef1caad7f7
--- /dev/null
+++ b/data/src/test/java/org/apache/iceberg/data/DataGenerators.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.data;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Types;
+
+/**
+ * Test data generators for different schema types. Add new generators to ALL 
array to include them
+ * in format model tests.
+ */
+class DataGenerators {
+
+  static final DataGenerator[] ALL = new DataGenerator[] {new 
StructOfPrimitive()};
+
+  private DataGenerators() {}
+
+  static class StructOfPrimitive implements DataGenerator {
+    private final Schema schema =
+        new Schema(
+            Types.NestedField.required(1, "row_id", Types.StringType.get()),
+            Types.NestedField.required(
+                2,
+                "struct_of_primitive",
+                Types.StructType.of(
+                    required(101, "id", Types.IntegerType.get()),
+                    required(102, "name", Types.StringType.get()))));
+
+    @Override
+    public Schema schema() {
+      return schema;
+    }
+  }
+}
diff --git 
a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java 
b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
deleted file mode 100644
index d2bf1cb3e8..0000000000
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.data;
-
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
-import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.List;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.TestBase;
-import org.apache.iceberg.deletes.EqualityDeleteWriter;
-import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.deletes.PositionDeleteWriter;
-import org.apache.iceberg.encryption.EncryptedFiles;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
-import org.apache.iceberg.encryption.EncryptionKeyMetadata;
-import org.apache.iceberg.formats.FileWriterBuilder;
-import org.apache.iceberg.formats.FormatModelRegistry;
-import org.apache.iceberg.inmemory.InMemoryFileIO;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.FieldSource;
-
-public class TestGenericFormatModels {
-  private static final List<Record> TEST_RECORDS =
-      RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
-
-  private static final FileFormat[] FILE_FORMATS =
-      new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};
-
-  @TempDir protected Path temp;
-
-  private InMemoryFileIO fileIO;
-  private EncryptedOutputFile encryptedFile;
-
-  @BeforeEach
-  public void before() {
-    this.fileIO = new InMemoryFileIO();
-    this.encryptedFile =
-        EncryptedFiles.encryptedOutput(
-            fileIO.newOutputFile("test-file"), EncryptionKeyMetadata.EMPTY);
-  }
-
-  @AfterEach
-  public void after() throws IOException {
-    fileIO.deleteFile(encryptedFile.encryptingOutputFile());
-    this.encryptedFile = null;
-    if (fileIO != null) {
-      fileIO.close();
-    }
-  }
-
-  @ParameterizedTest
-  @FieldSource("FILE_FORMATS")
-  public void testDataWriterRoundTrip(FileFormat fileFormat) throws 
IOException {
-    FileWriterBuilder<DataWriter<Record>, Schema> writerBuilder =
-        FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, 
encryptedFile);
-
-    DataFile dataFile;
-    DataWriter<Record> writer =
-        
writerBuilder.schema(TestBase.SCHEMA).spec(PartitionSpec.unpartitioned()).build();
-    try (writer) {
-      for (Record record : TEST_RECORDS) {
-        writer.write(record);
-      }
-    }
-
-    dataFile = writer.toDataFile();
-
-    assertThat(dataFile).isNotNull();
-    assertThat(dataFile.recordCount()).isEqualTo(TEST_RECORDS.size());
-    assertThat(dataFile.format()).isEqualTo(fileFormat);
-
-    // Verify the file content by reading it back
-    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
-    List<Record> readRecords;
-    try (CloseableIterable<Record> reader =
-        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
-            .project(TestBase.SCHEMA)
-            .reuseContainers()
-            .build()) {
-      readRecords = ImmutableList.copyOf(CloseableIterable.transform(reader, 
Record::copy));
-    }
-
-    DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, 
readRecords);
-  }
-
-  @ParameterizedTest
-  @FieldSource("FILE_FORMATS")
-  public void testEqualityDeleteWriterRoundTrip(FileFormat fileFormat) throws 
IOException {
-    FileWriterBuilder<EqualityDeleteWriter<Record>, Schema> writerBuilder =
-        FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, 
Record.class, encryptedFile);
-
-    DeleteFile deleteFile;
-    EqualityDeleteWriter<Record> writer =
-        writerBuilder
-            .schema(TestBase.SCHEMA)
-            .spec(PartitionSpec.unpartitioned())
-            .equalityFieldIds(3)
-            .build();
-    try (writer) {
-      for (Record record : TEST_RECORDS) {
-        writer.write(record);
-      }
-    }
-
-    deleteFile = writer.toDeleteFile();
-
-    assertThat(deleteFile).isNotNull();
-    assertThat(deleteFile.recordCount()).isEqualTo(TEST_RECORDS.size());
-    assertThat(deleteFile.format()).isEqualTo(fileFormat);
-    assertThat(deleteFile.equalityFieldIds()).containsExactly(3);
-
-    // Verify the file content by reading it back
-    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
-    List<Record> readRecords;
-    try (CloseableIterable<Record> reader =
-        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
-            .project(TestBase.SCHEMA)
-            .build()) {
-      readRecords = ImmutableList.copyOf(reader);
-    }
-
-    DataTestHelpers.assertEquals(TestBase.SCHEMA.asStruct(), TEST_RECORDS, 
readRecords);
-  }
-
-  @ParameterizedTest
-  @FieldSource("FILE_FORMATS")
-  public void testPositionDeleteWriterRoundTrip(FileFormat fileFormat) throws 
IOException {
-    Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH, 
DELETE_FILE_POS);
-
-    FileWriterBuilder<PositionDeleteWriter<Record>, ?> writerBuilder =
-        FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, 
encryptedFile);
-
-    PositionDelete<Record> delete1 = PositionDelete.create();
-    delete1.set("data-file-1.parquet", 0L);
-
-    PositionDelete<Record> delete2 = PositionDelete.create();
-    delete2.set("data-file-1.parquet", 1L);
-
-    List<PositionDelete<Record>> positionDeletes = ImmutableList.of(delete1, 
delete2);
-
-    DeleteFile deleteFile;
-    PositionDeleteWriter<Record> writer = 
writerBuilder.spec(PartitionSpec.unpartitioned()).build();
-    try (writer) {
-      for (PositionDelete<Record> delete : positionDeletes) {
-        writer.write(delete);
-      }
-    }
-
-    deleteFile = writer.toDeleteFile();
-
-    assertThat(deleteFile).isNotNull();
-    assertThat(deleteFile.recordCount()).isEqualTo(2);
-    assertThat(deleteFile.format()).isEqualTo(fileFormat);
-
-    // Verify the file content by reading it back
-    InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
-    List<Record> readRecords;
-    try (CloseableIterable<Record> reader =
-        FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
-            .project(positionDeleteSchema)
-            .build()) {
-      readRecords = ImmutableList.copyOf(reader);
-    }
-
-    List<Record> expected =
-        ImmutableList.of(
-            GenericRecord.create(positionDeleteSchema)
-                .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", 
DELETE_FILE_POS.name(), 0L),
-            GenericRecord.create(positionDeleteSchema)
-                .copy(DELETE_FILE_PATH.name(), "data-file-1.parquet", 
DELETE_FILE_POS.name(), 1L));
-
-    DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), expected, 
readRecords);
-  }
-}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
new file mode 100644
index 0000000000..8c99fdf521
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestHelpers;
+
+public class TestFlinkFormatModel extends BaseFormatModelTests<RowData> {
+
+  @Override
+  protected Class<RowData> engineType() {
+    return RowData.class;
+  }
+
+  @Override
+  protected Object engineSchema(Schema schema) {
+    return FlinkSchemaUtil.convert(schema);
+  }
+
+  @Override
+  protected RowData convertToEngine(Record record, Schema schema) {
+    return RowDataConverter.convert(schema, record);
+  }
+
+  @Override
+  protected void assertEquals(Schema schema, List<RowData> expected, 
List<RowData> actual) {
+    TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema));
+  }
+}
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
new file mode 100644
index 0000000000..b7ba7a3430
--- /dev/null
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/** Converts Iceberg Record to Spark InternalRow for testing. */
+public class InternalRowConverter {
+  private static final OffsetDateTime EPOCH = 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+  private InternalRowConverter() {}
+
+  public static InternalRow convert(Schema schema, Record record) {
+    return convert(schema.asStruct(), record);
+  }
+
+  private static InternalRow convert(Types.StructType struct, Record record) {
+    GenericInternalRow internalRow = new 
GenericInternalRow(struct.fields().size());
+    List<Types.NestedField> fields = struct.fields();
+    for (int i = 0; i < fields.size(); i += 1) {
+      Types.NestedField field = fields.get(i);
+
+      Type fieldType = field.type();
+      internalRow.update(i, convert(fieldType, record.get(i)));
+    }
+
+    return internalRow;
+  }
+
+  private static Object convert(Type type, Object value) {
+    if (value == null) {
+      return null;
+    }
+
+    return switch (type.typeId()) {
+      case BOOLEAN, INTEGER, LONG, FLOAT, DOUBLE -> value;
+      case DATE -> (int) ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) value);
+      case TIMESTAMP ->
+          ((Types.TimestampType) type).shouldAdjustToUTC()
+              ? ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) value)
+              : ChronoUnit.MICROS.between(EPOCH, ((LocalDateTime) 
value).atZone(ZoneId.of("UTC")));
+      case STRING -> UTF8String.fromString((String) value);
+      case UUID -> UTF8String.fromString(value.toString());
+      case FIXED, BINARY -> {
+        ByteBuffer buffer = (ByteBuffer) value;
+        yield Arrays.copyOfRange(
+            buffer.array(),
+            buffer.arrayOffset() + buffer.position(),
+            buffer.arrayOffset() + buffer.remaining());
+      }
+      case DECIMAL -> Decimal.apply((BigDecimal) value);
+      case STRUCT -> convert((Types.StructType) type, (Record) value);
+      case LIST ->
+          new GenericArrayData(
+              ((List<?>) value)
+                  .stream()
+                      .map(element -> convert(type.asListType().elementType(), 
element))
+                      .toArray());
+      case MAP ->
+          new ArrayBasedMapData(
+              new GenericArrayData(
+                  ((Map<?, ?>) value)
+                      .keySet().stream()
+                          .map(o -> convert(type.asMapType().keyType(), o))
+                          .toArray()),
+              new GenericArrayData(
+                  ((Map<?, ?>) value)
+                      .values().stream()
+                          .map(o -> convert(type.asMapType().valueType(), o))
+                          .toArray()));
+        // TIME is not supported by Spark, VARIANT not yet implemented
+      default ->
+          throw new UnsupportedOperationException(
+              "Unsupported type for conversion to InternalRow: " + type);
+    };
+  }
+}
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
new file mode 100644
index 0000000000..c18e4c053f
--- /dev/null
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class TestSparkFormatModel extends BaseFormatModelTests<InternalRow> {
+
+  @Override
+  protected Class<InternalRow> engineType() {
+    return InternalRow.class;
+  }
+
+  @Override
+  protected Object engineSchema(Schema schema) {
+    return SparkSchemaUtil.convert(schema);
+  }
+
+  @Override
+  protected InternalRow convertToEngine(Record record, Schema schema) {
+    return InternalRowConverter.convert(schema, record);
+  }
+
+  @Override
+  protected void assertEquals(Schema schema, List<InternalRow> expected, 
List<InternalRow> actual) {
+    assertThat(actual).hasSameSizeAs(expected);
+    for (int i = 0; i < expected.size(); i++) {
+      TestHelpers.assertEquals(schema, expected.get(i), actual.get(i));
+    }
+  }
+}


Reply via email to