This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9528f85fb2 Spark, Flink: Backport add TCK for File Format API (#15619)
9528f85fb2 is described below
commit 9528f85fb29a8140a7cd6dc332f0efb4ba2592bb
Author: Joy Haldar <[email protected]>
AuthorDate: Fri Mar 13 18:38:55 2026 +0530
Spark, Flink: Backport add TCK for File Format API (#15619)
Backports #15441
---
.../iceberg/flink/data/TestFlinkFormatModel.java | 51 +++++++++
.../iceberg/flink/data/TestFlinkFormatModel.java | 51 +++++++++
.../iceberg/spark/data/InternalRowConverter.java | 115 +++++++++++++++++++++
.../iceberg/spark/data/TestSparkFormatModel.java | 54 ++++++++++
.../iceberg/spark/data/InternalRowConverter.java | 115 +++++++++++++++++++++
.../iceberg/spark/data/TestSparkFormatModel.java | 54 ++++++++++
.../iceberg/spark/data/InternalRowConverter.java | 115 +++++++++++++++++++++
.../iceberg/spark/data/TestSparkFormatModel.java | 54 ++++++++++
8 files changed, 609 insertions(+)
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
new file mode 100644
index 0000000000..8c99fdf521
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestHelpers;
+
+public class TestFlinkFormatModel extends BaseFormatModelTests<RowData> {
+
+ @Override
+ protected Class<RowData> engineType() {
+ return RowData.class;
+ }
+
+ @Override
+ protected Object engineSchema(Schema schema) {
+ return FlinkSchemaUtil.convert(schema);
+ }
+
+ @Override
+ protected RowData convertToEngine(Record record, Schema schema) {
+ return RowDataConverter.convert(schema, record);
+ }
+
+ @Override
+ protected void assertEquals(Schema schema, List<RowData> expected,
List<RowData> actual) {
+ TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema));
+ }
+}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
new file mode 100644
index 0000000000..8c99fdf521
--- /dev/null
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkFormatModel.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import java.util.List;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.RowDataConverter;
+import org.apache.iceberg.flink.TestHelpers;
+
+public class TestFlinkFormatModel extends BaseFormatModelTests<RowData> {
+
+ @Override
+ protected Class<RowData> engineType() {
+ return RowData.class;
+ }
+
+ @Override
+ protected Object engineSchema(Schema schema) {
+ return FlinkSchemaUtil.convert(schema);
+ }
+
+ @Override
+ protected RowData convertToEngine(Record record, Schema schema) {
+ return RowDataConverter.convert(schema, record);
+ }
+
+ @Override
+ protected void assertEquals(Schema schema, List<RowData> expected,
List<RowData> actual) {
+ TestHelpers.assertRows(actual, expected, FlinkSchemaUtil.convert(schema));
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
new file mode 100644
index 0000000000..b7ba7a3430
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/** Converts Iceberg Record to Spark InternalRow for testing. */
+public class InternalRowConverter {
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+ private InternalRowConverter() {}
+
+ public static InternalRow convert(Schema schema, Record record) {
+ return convert(schema.asStruct(), record);
+ }
+
+ private static InternalRow convert(Types.StructType struct, Record record) {
+ GenericInternalRow internalRow = new
GenericInternalRow(struct.fields().size());
+ List<Types.NestedField> fields = struct.fields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Types.NestedField field = fields.get(i);
+
+ Type fieldType = field.type();
+ internalRow.update(i, convert(fieldType, record.get(i)));
+ }
+
+ return internalRow;
+ }
+
+ private static Object convert(Type type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return switch (type.typeId()) {
+ case BOOLEAN, INTEGER, LONG, FLOAT, DOUBLE -> value;
+ case DATE -> (int) ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) value);
+ case TIMESTAMP ->
+ ((Types.TimestampType) type).shouldAdjustToUTC()
+ ? ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) value)
+ : ChronoUnit.MICROS.between(EPOCH, ((LocalDateTime)
value).atZone(ZoneId.of("UTC")));
+ case STRING -> UTF8String.fromString((String) value);
+ case UUID -> UTF8String.fromString(value.toString());
+ case FIXED, BINARY -> {
+ ByteBuffer buffer = (ByteBuffer) value;
+ yield Arrays.copyOfRange(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.arrayOffset() + buffer.remaining());
+ }
+ case DECIMAL -> Decimal.apply((BigDecimal) value);
+ case STRUCT -> convert((Types.StructType) type, (Record) value);
+ case LIST ->
+ new GenericArrayData(
+ ((List<?>) value)
+ .stream()
+ .map(element -> convert(type.asListType().elementType(),
element))
+ .toArray());
+ case MAP ->
+ new ArrayBasedMapData(
+ new GenericArrayData(
+ ((Map<?, ?>) value)
+ .keySet().stream()
+ .map(o -> convert(type.asMapType().keyType(), o))
+ .toArray()),
+ new GenericArrayData(
+ ((Map<?, ?>) value)
+ .values().stream()
+ .map(o -> convert(type.asMapType().valueType(), o))
+ .toArray()));
+ // TIME is not supported by Spark, VARIANT not yet implemented
+ default ->
+ throw new UnsupportedOperationException(
+ "Unsupported type for conversion to InternalRow: " + type);
+ };
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
new file mode 100644
index 0000000000..c18e4c053f
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class TestSparkFormatModel extends BaseFormatModelTests<InternalRow> {
+
+ @Override
+ protected Class<InternalRow> engineType() {
+ return InternalRow.class;
+ }
+
+ @Override
+ protected Object engineSchema(Schema schema) {
+ return SparkSchemaUtil.convert(schema);
+ }
+
+ @Override
+ protected InternalRow convertToEngine(Record record, Schema schema) {
+ return InternalRowConverter.convert(schema, record);
+ }
+
+ @Override
+ protected void assertEquals(Schema schema, List<InternalRow> expected,
List<InternalRow> actual) {
+ assertThat(actual).hasSameSizeAs(expected);
+ for (int i = 0; i < expected.size(); i++) {
+ TestHelpers.assertEquals(schema, expected.get(i), actual.get(i));
+ }
+ }
+}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
new file mode 100644
index 0000000000..b7ba7a3430
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/** Converts Iceberg Record to Spark InternalRow for testing. */
+public class InternalRowConverter {
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+ private InternalRowConverter() {}
+
+ public static InternalRow convert(Schema schema, Record record) {
+ return convert(schema.asStruct(), record);
+ }
+
+ private static InternalRow convert(Types.StructType struct, Record record) {
+ GenericInternalRow internalRow = new
GenericInternalRow(struct.fields().size());
+ List<Types.NestedField> fields = struct.fields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Types.NestedField field = fields.get(i);
+
+ Type fieldType = field.type();
+ internalRow.update(i, convert(fieldType, record.get(i)));
+ }
+
+ return internalRow;
+ }
+
+ private static Object convert(Type type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return switch (type.typeId()) {
+ case BOOLEAN, INTEGER, LONG, FLOAT, DOUBLE -> value;
+ case DATE -> (int) ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) value);
+ case TIMESTAMP ->
+ ((Types.TimestampType) type).shouldAdjustToUTC()
+ ? ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) value)
+ : ChronoUnit.MICROS.between(EPOCH, ((LocalDateTime)
value).atZone(ZoneId.of("UTC")));
+ case STRING -> UTF8String.fromString((String) value);
+ case UUID -> UTF8String.fromString(value.toString());
+ case FIXED, BINARY -> {
+ ByteBuffer buffer = (ByteBuffer) value;
+ yield Arrays.copyOfRange(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.arrayOffset() + buffer.remaining());
+ }
+ case DECIMAL -> Decimal.apply((BigDecimal) value);
+ case STRUCT -> convert((Types.StructType) type, (Record) value);
+ case LIST ->
+ new GenericArrayData(
+ ((List<?>) value)
+ .stream()
+ .map(element -> convert(type.asListType().elementType(),
element))
+ .toArray());
+ case MAP ->
+ new ArrayBasedMapData(
+ new GenericArrayData(
+ ((Map<?, ?>) value)
+ .keySet().stream()
+ .map(o -> convert(type.asMapType().keyType(), o))
+ .toArray()),
+ new GenericArrayData(
+ ((Map<?, ?>) value)
+ .values().stream()
+ .map(o -> convert(type.asMapType().valueType(), o))
+ .toArray()));
+ // TIME is not supported by Spark, VARIANT not yet implemented
+ default ->
+ throw new UnsupportedOperationException(
+ "Unsupported type for conversion to InternalRow: " + type);
+ };
+ }
+}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
new file mode 100644
index 0000000000..c18e4c053f
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class TestSparkFormatModel extends BaseFormatModelTests<InternalRow> {
+
+ @Override
+ protected Class<InternalRow> engineType() {
+ return InternalRow.class;
+ }
+
+ @Override
+ protected Object engineSchema(Schema schema) {
+ return SparkSchemaUtil.convert(schema);
+ }
+
+ @Override
+ protected InternalRow convertToEngine(Record record, Schema schema) {
+ return InternalRowConverter.convert(schema, record);
+ }
+
+ @Override
+ protected void assertEquals(Schema schema, List<InternalRow> expected,
List<InternalRow> actual) {
+ assertThat(actual).hasSameSizeAs(expected);
+ for (int i = 0; i < expected.size(); i++) {
+ TestHelpers.assertEquals(schema, expected.get(i), actual.get(i));
+ }
+ }
+}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
new file mode 100644
index 0000000000..b7ba7a3430
--- /dev/null
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/InternalRowConverter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/** Converts Iceberg Record to Spark InternalRow for testing. */
+public class InternalRowConverter {
+ private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
+
+ private InternalRowConverter() {}
+
+ public static InternalRow convert(Schema schema, Record record) {
+ return convert(schema.asStruct(), record);
+ }
+
+ private static InternalRow convert(Types.StructType struct, Record record) {
+ GenericInternalRow internalRow = new
GenericInternalRow(struct.fields().size());
+ List<Types.NestedField> fields = struct.fields();
+ for (int i = 0; i < fields.size(); i += 1) {
+ Types.NestedField field = fields.get(i);
+
+ Type fieldType = field.type();
+ internalRow.update(i, convert(fieldType, record.get(i)));
+ }
+
+ return internalRow;
+ }
+
+ private static Object convert(Type type, Object value) {
+ if (value == null) {
+ return null;
+ }
+
+ return switch (type.typeId()) {
+ case BOOLEAN, INTEGER, LONG, FLOAT, DOUBLE -> value;
+ case DATE -> (int) ChronoUnit.DAYS.between(EPOCH_DAY, (LocalDate) value);
+ case TIMESTAMP ->
+ ((Types.TimestampType) type).shouldAdjustToUTC()
+ ? ChronoUnit.MICROS.between(EPOCH, (OffsetDateTime) value)
+ : ChronoUnit.MICROS.between(EPOCH, ((LocalDateTime)
value).atZone(ZoneId.of("UTC")));
+ case STRING -> UTF8String.fromString((String) value);
+ case UUID -> UTF8String.fromString(value.toString());
+ case FIXED, BINARY -> {
+ ByteBuffer buffer = (ByteBuffer) value;
+ yield Arrays.copyOfRange(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.arrayOffset() + buffer.remaining());
+ }
+ case DECIMAL -> Decimal.apply((BigDecimal) value);
+ case STRUCT -> convert((Types.StructType) type, (Record) value);
+ case LIST ->
+ new GenericArrayData(
+ ((List<?>) value)
+ .stream()
+ .map(element -> convert(type.asListType().elementType(),
element))
+ .toArray());
+ case MAP ->
+ new ArrayBasedMapData(
+ new GenericArrayData(
+ ((Map<?, ?>) value)
+ .keySet().stream()
+ .map(o -> convert(type.asMapType().keyType(), o))
+ .toArray()),
+ new GenericArrayData(
+ ((Map<?, ?>) value)
+ .values().stream()
+ .map(o -> convert(type.asMapType().valueType(), o))
+ .toArray()));
+ // TIME is not supported by Spark, VARIANT not yet implemented
+ default ->
+ throw new UnsupportedOperationException(
+ "Unsupported type for conversion to InternalRow: " + type);
+ };
+ }
+}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
new file mode 100644
index 0000000000..c18e4c053f
--- /dev/null
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkFormatModel.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.BaseFormatModelTests;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+public class TestSparkFormatModel extends BaseFormatModelTests<InternalRow> {
+
+ @Override
+ protected Class<InternalRow> engineType() {
+ return InternalRow.class;
+ }
+
+ @Override
+ protected Object engineSchema(Schema schema) {
+ return SparkSchemaUtil.convert(schema);
+ }
+
+ @Override
+ protected InternalRow convertToEngine(Record record, Schema schema) {
+ return InternalRowConverter.convert(schema, record);
+ }
+
+ @Override
+ protected void assertEquals(Schema schema, List<InternalRow> expected,
List<InternalRow> actual) {
+ assertThat(actual).hasSameSizeAs(expected);
+ for (int i = 0; i < expected.size(); i++) {
+ TestHelpers.assertEquals(schema, expected.get(i), actual.get(i));
+ }
+ }
+}