This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 7e28be434c support StructInternalRow.getVariant (#14379)
7e28be434c is described below
commit 7e28be434c64afb9675c743335734bb8217c1df5
Author: Huaxin Gao <[email protected]>
AuthorDate: Sat Nov 8 20:41:33 2025 -0800
support StructInternalRow.getVariant (#14379)
* support StructInternalRow.getVariant
* add a test for GetVariantPassesThroughVariantVal
* address comments
* remove path for VariantVal
---
.../iceberg/spark/source/StructInternalRow.java | 40 ++++-
.../spark/source/TestStructInternalRowVariant.java | 182 +++++++++++++++++++++
2 files changed, 218 insertions(+), 4 deletions(-)
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
index 27fb014d8c..2d3c917e58 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
@@ -36,6 +37,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.variants.Variant;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -58,6 +60,7 @@ import org.apache.spark.sql.types.ShortType;
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.VariantType;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;
@@ -229,14 +232,19 @@ class StructInternalRow extends InternalRow {
return isNullAt(ordinal) ? null : getMapInternal(ordinal);
}
+ private MapData getMapInternal(int ordinal) {
+ return mapToMapData(
+ type.fields().get(ordinal).type().asMapType(), struct.get(ordinal,
Map.class));
+ }
+
@Override
public VariantVal getVariant(int ordinal) {
- throw new UnsupportedOperationException("Unsupported method: getVariant");
+ return isNullAt(ordinal) ? null : getVariantInternal(ordinal);
}
- private MapData getMapInternal(int ordinal) {
- return mapToMapData(
- type.fields().get(ordinal).type().asMapType(), struct.get(ordinal,
Map.class));
+ private VariantVal getVariantInternal(int ordinal) {
+ Object value = struct.get(ordinal, Object.class);
+ return toVariantVal(value);
}
@Override
@@ -276,6 +284,8 @@ class StructInternalRow extends InternalRow {
return getInt(ordinal);
} else if (dataType instanceof TimestampType) {
return getLong(ordinal);
+ } else if (dataType instanceof VariantType) {
+ return getVariantInternal(ordinal);
}
return null;
}
@@ -338,11 +348,33 @@ class StructInternalRow extends InternalRow {
array ->
(BiConsumer<Integer, Map<?, ?>>)
(pos, map) -> array[pos] =
mapToMapData(elementType.asMapType(), map));
+ case VARIANT:
+ return fillArray(
+ values,
+ array -> (BiConsumer<Integer, Object>) (pos, v) -> array[pos] =
toVariantVal(v));
default:
throw new UnsupportedOperationException("Unsupported array element
type: " + elementType);
}
}
+ private static VariantVal toVariantVal(Object value) {
+ if (value instanceof Variant) {
+ Variant variant = (Variant) value;
+ byte[] metadataBytes = new byte[variant.metadata().sizeInBytes()];
+ ByteBuffer metadataBuffer =
ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN);
+ variant.metadata().writeTo(metadataBuffer, 0);
+
+ byte[] valueBytes = new byte[variant.value().sizeInBytes()];
+ ByteBuffer valueBuffer =
ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN);
+ variant.value().writeTo(valueBuffer, 0);
+
+ return new VariantVal(valueBytes, metadataBytes);
+ }
+
+ throw new UnsupportedOperationException(
+ "Unsupported value for VARIANT in StructInternalRow: " +
value.getClass());
+ }
+
@SuppressWarnings("unchecked")
private <T> GenericArrayData fillArray(
Collection<?> values, Function<Object[], BiConsumer<Integer, T>>
makeSetter) {
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructInternalRowVariant.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructInternalRowVariant.java
new file mode 100644
index 0000000000..78e7e23b86
--- /dev/null
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructInternalRowVariant.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantValue;
+import org.apache.iceberg.variants.Variants;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.VariantType$;
+import org.apache.spark.unsafe.types.VariantVal;
+import org.junit.jupiter.api.Test;
+
+public class TestStructInternalRowVariant {
+
+ @Test
+ public void testGetVariantReturnsVariantVal() {
+ Types.StructType structType = variantStructType();
+ GenericRecord rec = newRecord(structType);
+ Variant variant = sampleVariant();
+ rec.set(0, variant);
+
+ InternalRow row = new StructInternalRow(structType).setStruct(rec);
+
+ VariantVal actual = row.getVariant(0);
+ assertThat(actual).isNotNull();
+
+ VariantMetadata metadata =
+
VariantMetadata.from(ByteBuffer.wrap(actual.getMetadata()).order(ByteOrder.LITTLE_ENDIAN));
+ assertThat(metadata.dictionarySize()).isEqualTo(1);
+ assertThat(metadata.get(0)).isEqualTo("k");
+
+ VariantValue actualValue =
+ VariantValue.from(
+ metadata,
ByteBuffer.wrap(actual.getValue()).order(ByteOrder.LITTLE_ENDIAN));
+
+
assertThat(actualValue.asObject().get("k").asPrimitive().get()).isEqualTo("v1");
+ }
+
+ @Test
+ public void testGetVariantNull() {
+ Types.StructType structType = variantStructType();
+ GenericRecord rec = newRecord(structType);
+ rec.set(0, null);
+
+ InternalRow row = new StructInternalRow(structType).setStruct(rec);
+ assertThat(row.getVariant(0)).isNull();
+ }
+
+ @Test
+ public void testArrayOfVariant() {
+ Types.ListType listType = Types.ListType.ofOptional(2,
Types.VariantType.get());
+ Types.StructType structType =
+ Types.StructType.of(Types.NestedField.optional(1, "arr", listType));
+
+ GenericRecord rec = GenericRecord.create(structType);
+
+ Variant v1 = sampleVariant();
+ Variant v2 = sampleVariant();
+
+ List<Object> elements = Arrays.asList(v1, v2, null);
+ rec.set(0, elements);
+
+ InternalRow row = new StructInternalRow(structType).setStruct(rec);
+ ArrayData arr = row.getArray(0);
+
+ Object firstVar = arr.get(0, VariantType$.MODULE$);
+ Object secondVar = arr.get(1, VariantType$.MODULE$);
+
+ assertThat(firstVar).isInstanceOf(VariantVal.class);
+ assertThat(secondVar).isInstanceOf(VariantVal.class);
+ assertThat(arr.isNullAt(2)).isTrue();
+
+ assertVariantValEqualsKV((VariantVal) firstVar, "k", "v1");
+ assertVariantValEqualsKV((VariantVal) secondVar, "k", "v1");
+ }
+
+ @Test
+ public void testMapWithVariant() {
+ Types.MapType mapType =
+ Types.MapType.ofOptional(2, 3, Types.StringType.get(),
Types.VariantType.get());
+ Types.StructType structType =
Types.StructType.of(Types.NestedField.optional(1, "m", mapType));
+
+ GenericRecord rec = GenericRecord.create(structType);
+ Map<String, Object> map = Maps.newHashMap();
+ map.put("a", sampleVariant());
+ map.put("b", sampleVariant());
+ rec.set(0, map);
+
+ InternalRow row = new StructInternalRow(structType).setStruct(rec);
+ MapData mapData = row.getMap(0);
+
+ ArrayData values = mapData.valueArray();
+ for (int i = 0; i < values.numElements(); i++) {
+ Object variant = values.get(i, VariantType$.MODULE$);
+ assertThat(variant).isInstanceOf(VariantVal.class);
+ assertVariantValEqualsKV((VariantVal) variant, "k", "v1");
+ }
+ }
+
+ @Test
+ public void testNestedStructVariant() {
+ Types.StructType variant =
+ Types.StructType.of(Types.NestedField.optional(2, "v",
Types.VariantType.get()));
+ Types.StructType structVariant =
+ Types.StructType.of(Types.NestedField.optional(1, "n", variant));
+
+ // Case 1: nested struct holds Iceberg Variant
+ GenericRecord variantStructRec = GenericRecord.create(variant);
+ variantStructRec.set(0, sampleVariant());
+ GenericRecord structRec = GenericRecord.create(structVariant);
+ structRec.set(0, variantStructRec);
+
+ InternalRow structRow = new
StructInternalRow(structVariant).setStruct(structRec);
+ InternalRow nested = structRow.getStruct(0, 1);
+ VariantVal variantVal1 = nested.getVariant(0);
+ assertVariantValEqualsKV(variantVal1, "k", "v1");
+ }
+
+ @Test
+ public void testGetWithVariantType() {
+ Types.StructType structType = variantStructType();
+ GenericRecord rec = newRecord(structType);
+ rec.set(0, sampleVariant());
+
+ InternalRow row = new StructInternalRow(structType).setStruct(rec);
+ Object obj = row.get(0, VariantType$.MODULE$);
+ assertThat(obj).isInstanceOf(VariantVal.class);
+ assertVariantValEqualsKV((VariantVal) obj, "k", "v1");
+ }
+
+ private static Types.StructType variantStructType() {
+ return Types.StructType.of(Types.NestedField.optional(1, "a",
Types.VariantType.get()));
+ }
+
+ private static GenericRecord newRecord(Types.StructType structType) {
+ return GenericRecord.create(structType);
+ }
+
+ private static Variant sampleVariant() {
+ VariantMetadata md = Variants.metadata("k");
+ org.apache.iceberg.variants.ShreddedObject obj = Variants.object(md);
+ obj.put("k", Variants.of("v1"));
+ return Variant.of(md, obj);
+ }
+
+ private static void assertVariantValEqualsKV(VariantVal vv, String key,
String expected) {
+ VariantMetadata metadata =
+
VariantMetadata.from(ByteBuffer.wrap(vv.getMetadata()).order(ByteOrder.LITTLE_ENDIAN));
+ VariantValue value =
+ VariantValue.from(metadata,
ByteBuffer.wrap(vv.getValue()).order(ByteOrder.LITTLE_ENDIAN));
+
assertThat(value.asObject().get(key).asPrimitive().get()).isEqualTo(expected);
+ }
+}