rdblue commented on code in PR #13219:
URL: https://github.com/apache/iceberg/pull/13219#discussion_r2229464159
##########
spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java:
##########
@@ -605,4 +723,172 @@ public void testRowLineage() throws Exception {
record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)),
record.copy(Map.of("id", 5L, "data", "e"))));
}
+
+ @Test
+ public void testUnshreddedVariant() throws IOException {
+ Assumptions.assumeThat(supportsVariant()).as("Variant support is not
implemented").isTrue();
+
+ Schema schema =
+ new Schema(required(0, "id", LongType.get()), optional(1, "data",
Types.VariantType.get()));
+
+ writeAndValidate(schema);
+ }
+
+ @Test
+ public void testShreddedVariant() throws IOException {
+ Assumptions.assumeThat(supportsVariant()).as("Variant support is not
implemented").isTrue();
+
+ GroupType fieldA = field("a",
shreddedPrimitive(PrimitiveType.PrimitiveTypeName.INT32));
+ GroupType fieldB =
+ field(
+ "b",
+ shreddedPrimitive(
+ PrimitiveType.PrimitiveTypeName.BINARY,
LogicalTypeAnnotation.stringType()));
+ GroupType objectFields = objectFields(fieldA, fieldB);
+ GroupType variantType = variant("var", 2, objectFields);
+ MessageType parquetSchema = parquetSchema(variantType);
+
+ Record recordA = record(fieldA, Map.of("value",
serialize(Variants.ofNull())));
+ Record recordB = record(fieldB, Map.of("typed_value", "iceberg"));
+ Record fields = record(objectFields, Map.of("a", recordA, "b", recordB));
+ Record variant =
+ record(variantType, Map.of("metadata", TEST_METADATA_BUFFER,
"typed_value", fields));
+ Record record = record(parquetSchema, Map.of("id", 1, "var", variant));
+ InternalRow actual = writeAndRead(icebergSchema(parquetSchema), SCHEMA,
record);
+
+ Record expected = GenericRecord.create(SCHEMA);
+ expected.set(0, 1);
+ ShreddedObject expectedObject = Variants.object(TEST_METADATA);
+ expectedObject.put("a", Variants.ofNull());
+ expectedObject.put("b", Variants.of("iceberg"));
+ expected.set(1, Variant.of(TEST_METADATA, expectedObject));
+
+ GenericsHelpers.assertEqualsUnsafe(SCHEMA.asStruct(), expected, actual);
+ }
+
+ private static InternalRow writeAndRead(Schema writeSchema, Schema
expectedSchema, Record record)
+ throws IOException {
+ return Iterables.getOnlyElement(writeAndRead(writeSchema, expectedSchema,
List.of(record)));
+ }
+
+ private static List<InternalRow> writeAndRead(
+ Schema writeSchema, Schema expectedSchema, List<Record> records) throws
IOException {
+ OutputFile output = new InMemoryOutputFile();
+ try (FileAppender<Record> writer =
+ Parquet.write(output)
+ .schema(writeSchema)
+ .createWriterFunc(GenericParquetWriter::create)
+ .named("test")
+ .build()) {
+ writer.addAll(records);
+ }
+
+ try (CloseableIterable<InternalRow> reader =
+ Parquet.read(output.toInputFile())
+ .project(expectedSchema)
+ .createReaderFunc(
+ type -> SparkParquetReaders.buildReader(expectedSchema, type,
ID_TO_CONSTANT))
+ .build()) {
+ return Lists.newArrayList(reader);
+ }
+ }
+
+ private static ByteBuffer serialize(VariantValue value) {
+ ByteBuffer buffer =
ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
+ value.writeTo(buffer, 0);
+ return buffer;
+ }
+
+ private static Record record(GroupType type, Map<String, Object> fields) {
+ Record record =
org.apache.iceberg.data.GenericRecord.create(icebergSchema(type));
+ for (Map.Entry<String, Object> entry : fields.entrySet()) {
+ record.setField(entry.getKey(), entry.getValue());
+ }
+ return record;
+ }
+
+ private static Schema icebergSchema(GroupType schema) {
+ if (schema instanceof MessageType) {
+ return ParquetSchemaUtil.convert((MessageType) schema);
+
+ } else {
+ MessageType messageType =
+ org.apache.parquet.schema.Types.buildMessage()
+ .addFields(schema.getFields().toArray(new
org.apache.parquet.schema.Type[0]))
+ .named(schema.getName());
+ return ParquetSchemaUtil.convert(messageType);
+ }
+ }
+
+ private static MessageType parquetSchema(org.apache.parquet.schema.Type
variantType) {
Review Comment:
These static methods are all from `TestVariantReaders` right? Should they be
shared? This is quite a bit of test code just to validate that the readers are
built correctly using the Parquet variant reader builder.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]