This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 88500ecb45 Core: Rewrite the Iceberg Arrow schema translation to use
the visitor pattern (#13699)
88500ecb45 is described below
commit 88500ecb457299cd46c5e075d29556ffdd5eaad5
Author: Anoop Johnson <[email protected]>
AuthorDate: Tue Aug 19 10:49:57 2025 -0600
Core: Rewrite the Iceberg Arrow schema translation to use the visitor
pattern (#13699)
---
.../org/apache/iceberg/arrow/ArrowSchemaUtil.java | 205 +++++++++++++--------
.../apache/iceberg/arrow/TestArrowSchemaUtil.java | 80 +++++++-
2 files changed, 203 insertions(+), 82 deletions(-)
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
index 34cc3b45fd..c70e4ff3ff 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.arrow;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.arrow.vector.types.DateUnit;
@@ -30,6 +31,8 @@ import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.MapType;
@@ -51,94 +54,134 @@ public class ArrowSchemaUtil {
public static Schema convert(final org.apache.iceberg.Schema schema) {
ImmutableList.Builder<Field> fields = ImmutableList.builder();
- for (NestedField f : schema.columns()) {
- fields.add(convert(f));
+ for (NestedField field : schema.columns()) {
+ fields.add(TypeUtil.visit(field.type(), new
IcebergToArrowTypeConverter(field)));
}
return new Schema(fields.build());
}
public static Field convert(final NestedField field) {
- final ArrowType arrowType;
-
- final List<Field> children = Lists.newArrayList();
- Map<String, String> metadata = null;
-
- switch (field.type().typeId()) {
- case BINARY:
- arrowType = ArrowType.Binary.INSTANCE;
- break;
- case FIXED:
- final Types.FixedType fixedType = (Types.FixedType) field.type();
- arrowType = new ArrowType.FixedSizeBinary(fixedType.length());
- break;
- case BOOLEAN:
- arrowType = ArrowType.Bool.INSTANCE;
- break;
- case INTEGER:
- arrowType = new ArrowType.Int(Integer.SIZE, true /* signed */);
- break;
- case LONG:
- arrowType = new ArrowType.Int(Long.SIZE, true /* signed */);
- break;
- case FLOAT:
- arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
- break;
- case DOUBLE:
- arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
- break;
- case DECIMAL:
- final Types.DecimalType decimalType = (Types.DecimalType) field.type();
- arrowType = new ArrowType.Decimal(decimalType.precision(),
decimalType.scale(), 128);
- break;
- case STRING:
- arrowType = ArrowType.Utf8.INSTANCE;
- break;
- case TIME:
- arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
- break;
- case UUID:
- arrowType = new ArrowType.FixedSizeBinary(16);
- break;
- case TIMESTAMP:
- arrowType =
- new ArrowType.Timestamp(
- TimeUnit.MICROSECOND,
- ((Types.TimestampType) field.type()).shouldAdjustToUTC() ?
"UTC" : null);
- break;
- case DATE:
- arrowType = new ArrowType.Date(DateUnit.DAY);
- break;
- case STRUCT:
- final StructType struct = field.type().asStructType();
- arrowType = ArrowType.Struct.INSTANCE;
-
- for (NestedField nested : struct.fields()) {
- children.add(convert(nested));
- }
- break;
- case LIST:
- final ListType listType = field.type().asListType();
- arrowType = ArrowType.List.INSTANCE;
-
- for (NestedField nested : listType.fields()) {
- children.add(convert(nested));
- }
- break;
- case MAP:
- metadata = ImmutableMap.of(ORIGINAL_TYPE, MAP_TYPE);
- final MapType mapType = field.type().asMapType();
- arrowType = new ArrowType.Map(false);
- List<Field> entryFields = Lists.transform(mapType.fields(),
ArrowSchemaUtil::convert);
- Field entry =
- new Field("", new FieldType(field.isOptional(), arrowType, null),
entryFields);
- children.add(entry);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported field type: " +
field);
+ return TypeUtil.visit(field.type(), new
IcebergToArrowTypeConverter(field));
+ }
+
+ private static class IcebergToArrowTypeConverter extends
TypeUtil.SchemaVisitor<Field> {
+ private final NestedField currentField;
+
+ IcebergToArrowTypeConverter(NestedField field) {
+ this.currentField = field;
+ }
+
+ @Override
+ public Field schema(org.apache.iceberg.Schema schema, Field structResult) {
+ return structResult;
+ }
+
+ @Override
+ public Field struct(StructType struct, List<Field> fieldResults) {
+ return new Field(
+ currentField.name(),
+ new FieldType(currentField.isOptional(), ArrowType.Struct.INSTANCE,
null),
+ convertChildren(struct.fields()));
+ }
+
+ @Override
+ public Field field(NestedField field, Field fieldResult) {
+ return fieldResult;
}
- return new Field(
- field.name(), new FieldType(field.isOptional(), arrowType, null,
metadata), children);
+ @Override
+ public Field list(ListType list, Field elementResult) {
+ return new Field(
+ currentField.name(),
+ new FieldType(currentField.isOptional(), ArrowType.List.INSTANCE,
null),
+ convertChildren(list.fields()));
+ }
+
+ @Override
+ public Field map(MapType map, Field keyResult, Field valueResult) {
+ Map<String, String> metadata = ImmutableMap.of(ORIGINAL_TYPE, MAP_TYPE);
+ ArrowType arrowType = new ArrowType.Map(false);
+
+ List<Field> entryFields = convertChildren(map.fields());
+
+ Field entry =
+ new Field("", new FieldType(currentField.isOptional(), arrowType,
null), entryFields);
+ List<Field> children = Lists.newArrayList(entry);
+
+ return new Field(
+ currentField.name(),
+ new FieldType(currentField.isOptional(), arrowType, null, metadata),
+ children);
+ }
+
+ private List<Field> convertChildren(Collection<NestedField> children) {
+ List<Field> converted = Lists.newArrayListWithCapacity(children.size());
+
+ for (NestedField child : children) {
+ converted.add(TypeUtil.visit(child.type(), new
IcebergToArrowTypeConverter(child)));
+ }
+
+ return converted;
+ }
+
+ @Override
+ public Field primitive(Type.PrimitiveType primitive) {
+ final ArrowType arrowType;
+
+ switch (primitive.typeId()) {
+ case BINARY:
+ arrowType = ArrowType.Binary.INSTANCE;
+ break;
+ case FIXED:
+ final Types.FixedType fixedType = (Types.FixedType) primitive;
+ arrowType = new ArrowType.FixedSizeBinary(fixedType.length());
+ break;
+ case BOOLEAN:
+ arrowType = ArrowType.Bool.INSTANCE;
+ break;
+ case INTEGER:
+ arrowType = new ArrowType.Int(Integer.SIZE, true /* signed */);
+ break;
+ case LONG:
+ arrowType = new ArrowType.Int(Long.SIZE, true /* signed */);
+ break;
+ case FLOAT:
+ arrowType = new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ break;
+ case DOUBLE:
+ arrowType = new
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ break;
+ case DECIMAL:
+ final Types.DecimalType decimalType = (Types.DecimalType) primitive;
+ arrowType = new ArrowType.Decimal(decimalType.precision(),
decimalType.scale(), 128);
+ break;
+ case STRING:
+ arrowType = ArrowType.Utf8.INSTANCE;
+ break;
+ case TIME:
+ arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE);
+ break;
+ case UUID:
+ arrowType = new ArrowType.FixedSizeBinary(16);
+ break;
+ case TIMESTAMP:
+ arrowType =
+ new ArrowType.Timestamp(
+ TimeUnit.MICROSECOND,
+ ((Types.TimestampType) primitive).shouldAdjustToUTC() ?
"UTC" : null);
+ break;
+ case DATE:
+ arrowType = new ArrowType.Date(DateUnit.DAY);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported primitive type:
" + primitive);
+ }
+
+ return new Field(
+ currentField.name(),
+ new FieldType(currentField.isOptional(), arrowType, null),
+ Lists.newArrayList());
+ }
}
}
diff --git
a/arrow/src/test/java/org/apache/iceberg/arrow/TestArrowSchemaUtil.java
b/arrow/src/test/java/org/apache/iceberg/arrow/TestArrowSchemaUtil.java
index 6fcc234f73..ad132c7bc8 100644
--- a/arrow/src/test/java/org/apache/iceberg/arrow/TestArrowSchemaUtil.java
+++ b/arrow/src/test/java/org/apache/iceberg/arrow/TestArrowSchemaUtil.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.types.Types.ListType;
import org.apache.iceberg.types.Types.LongType;
import org.apache.iceberg.types.Types.MapType;
import org.apache.iceberg.types.Types.StringType;
+import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.types.Types.TimeType;
import org.apache.iceberg.types.Types.TimestampType;
import org.junit.jupiter.api.Test;
@@ -87,7 +88,7 @@ public class TestArrowSchemaUtil {
}
@Test
- public void convertComplex() {
+ public void convertMap() {
Schema iceberg =
new Schema(
Types.NestedField.optional(
@@ -99,6 +100,83 @@ public class TestArrowSchemaUtil {
4, 5, StringType.get(), ListType.ofOptional(6,
TimestampType.withoutZone()))));
org.apache.arrow.vector.types.pojo.Schema arrow =
ArrowSchemaUtil.convert(iceberg);
assertThat(arrow.getFields()).hasSameSizeAs(iceberg.columns());
+
+ // Validate simple map with primitive values
+ Field simpleMap = arrow.findField("m");
+ assertThat(simpleMap).isNotNull();
+
assertThat(simpleMap.getType().getTypeID()).isEqualTo(ArrowType.ArrowTypeID.Map);
+ assertThat(simpleMap.getChildren()).hasSize(1);
+ Field simpleEntry = simpleMap.getChildren().get(0);
+ assertThat(simpleEntry.getChildren()).hasSize(2);
+ assertThat(simpleEntry.getChildren().get(0).getType().getTypeID())
+ .isEqualTo(ArrowType.ArrowTypeID.Utf8);
+ assertThat(simpleEntry.getChildren().get(1).getType().getTypeID())
+ .isEqualTo(ArrowType.ArrowTypeID.Int);
+
+ // Validate map with complex values (list of timestamps)
+ Field complexMap = arrow.findField("m2");
+ assertThat(complexMap).isNotNull();
+
assertThat(complexMap.getType().getTypeID()).isEqualTo(ArrowType.ArrowTypeID.Map);
+ assertThat(complexMap.getChildren()).hasSize(1);
+ Field complexEntry = complexMap.getChildren().get(0);
+ assertThat(complexEntry.getChildren()).hasSize(2);
+ assertThat(complexEntry.getChildren().get(0).getType().getTypeID())
+ .isEqualTo(ArrowType.ArrowTypeID.Utf8);
+ assertThat(complexEntry.getChildren().get(1).getType().getTypeID())
+ .isEqualTo(ArrowType.List.TYPE_TYPE);
+
+ // Validate the list element type within the map value
+ Field listValue = complexEntry.getChildren().get(1);
+ assertThat(listValue.getChildren()).hasSize(1);
+ assertThat(listValue.getChildren().get(0).getType().getTypeID())
+ .isEqualTo(ArrowType.ArrowTypeID.Timestamp);
+ }
+
+ @Test
+ public void convertStruct() {
+ Schema iceberg =
+ new Schema(
+ Types.NestedField.optional(
+ 0,
+ STRUCT_FIELD,
+ StructType.of(
+ Types.NestedField.required(1, "inner_string",
StringType.get()),
+ Types.NestedField.optional(2, "inner_int",
IntegerType.get()))));
+
+ org.apache.arrow.vector.types.pojo.Schema arrow =
ArrowSchemaUtil.convert(iceberg);
+ validate(iceberg, arrow);
+
+ Field structField = arrow.findField(STRUCT_FIELD);
+ assertThat(structField).isNotNull();
+ assertThat(structField.getChildren()).hasSize(2);
+
assertThat(structField.getChildren().get(0).getName()).isEqualTo("inner_string");
+
assertThat(structField.getChildren().get(1).getName()).isEqualTo("inner_int");
+ }
+
+ @Test
+ public void convertNestedStructInList() {
+ Schema iceberg =
+ new Schema(
+ Types.NestedField.optional(
+ 0,
+ "lt",
+ ListType.ofOptional(
+ 1,
+ StructType.of(
+ Types.NestedField.required(2, "nested_field",
StringType.get())))));
+
+ org.apache.arrow.vector.types.pojo.Schema arrow =
ArrowSchemaUtil.convert(iceberg);
+
+ validate(iceberg, arrow);
+ Field listField = arrow.findField("lt");
+ assertThat(listField).isNotNull();
+
assertThat(listField.getType().getTypeID()).isEqualTo(ArrowType.List.TYPE_TYPE);
+ assertThat(listField.getChildren()).hasSize(1);
+
+ Field structElement = listField.getChildren().get(0);
+
assertThat(structElement.getType().getTypeID()).isEqualTo(ArrowType.Struct.TYPE_TYPE);
+ assertThat(structElement.getChildren()).hasSize(1);
+
assertThat(structElement.getChildren().get(0).getName()).isEqualTo("nested_field");
}
private void validate(Schema iceberg,
org.apache.arrow.vector.types.pojo.Schema arrow) {