This is an automated email from the ASF dual-hosted git repository. gangwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new 30fa1de0b PARQUET-2450: Fix Avro projection for single-field repeated record types (#1300) 30fa1de0b is described below commit 30fa1de0b43037ce451063ea75570663fc9c663c Author: Claire McGinty <clai...@spotify.com> AuthorDate: Mon Mar 18 21:57:51 2024 -0400 PARQUET-2450: Fix Avro projection for single-field repeated record types (#1300) --- .../apache/parquet/avro/AvroRecordConverter.java | 1 + .../parquet/avro/TestArrayCompatibility.java | 130 ++++++++++++++++++++- .../apache/parquet/avro/TestSpecificReadWrite.java | 39 +++++++ 3 files changed, 168 insertions(+), 2 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 87325a0af..62d1f89fd 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -930,6 +930,7 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { static boolean isElementType(Type repeatedType, Schema elementSchema) { if (repeatedType.isPrimitive() || repeatedType.asGroupType().getFieldCount() > 1 + || repeatedType.getName().equals("array") || repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) { // The repeated type must be the element type because it is an invalid // synthetic wrapper. Must be a group with one optional or required field diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java index b4b543340..fd4cf2011 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java @@ -37,7 +37,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.DirectWriterTest; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; @@ -603,7 +605,7 @@ public class TestArrayCompatibility extends DirectWriterTest { public void testAvroCompatOptionalGroupInListWithSchema() throws Exception { Path test = writeDirect( "message AvroCompatOptionalGroupInListWithSchema {" + " optional group locations (LIST) {" - + " repeated group array {" + + " repeated group my_list {" + " optional group element {" + " required double latitude;" + " required double longitude;" @@ -616,7 +618,7 @@ public class TestArrayCompatibility extends DirectWriterTest { rc.startField("locations", 0); rc.startGroup(); - rc.startField("array", 0); // start writing array contents + rc.startField("my_list", 0); // start writing array contents // write a non-null element rc.startGroup(); // array level @@ -1103,6 +1105,130 @@ public class TestArrayCompatibility extends DirectWriterTest { assertReaderContains(newBehaviorReader(test, newDoubleSchema), newDoubleSchema, newDoubleRecord); } + @Test + public void testIsElementTypeRequiredRepeatedRecord() { + // Test `_tuple` style naming + MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" + + " required group list_field (LIST) {\n" + + " repeated group list_field_tuple (LIST) {\n" + + " repeated int32 int_field;\n" + + " }\n" + + " }\n" + + "}"); + Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema); + + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"), + avroSchema.getFields().get(0).schema())); + + // Test `array` style naming + parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" + + " required group list_field (LIST) {\n" + + " repeated group array {\n" + + " required int32 a;\n" + + " }\n" + + " }\n" + + "}"); + avroSchema = new AvroSchemaConverter().convert(parquetSchema); + + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list_field"), + avroSchema.getFields().get(0).schema())); + } + + @Test + public void testIsElementTypeOptionalRepeatedRecord() { + // Test `_tuple` style naming + MessageType parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" + + " optional group list_field (LIST) {\n" + + " repeated group list_field_tuple (LIST) {\n" + + " repeated int32 int_field;\n" + + " }\n" + + " }\n" + + "}"); + Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema); + + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"), + avroSchema.getFields().get(0).schema())); + + // Test `array` style naming + parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList {\n" + + " optional group list_field (LIST) {\n" + + " repeated group array {\n" + + " required int32 a;\n" + + " }\n" + + " }\n" + + "}"); + avroSchema = new AvroSchemaConverter().convert(parquetSchema); + + Assert.assertTrue(AvroRecordConverter.isElementType( + parquetSchema.getType("list_field"), + avroSchema.getFields().get(0).schema())); + } + + @Test + public void testIsElementTypeFailsInvalidSchema() throws Exception { + Path test = writeDirect( + "message MessageWithInvalidArraySchema {" + + " optional group locations (LIST) {" + + " repeated group array {" + + " optional group element {" + + " required double latitude;" + + " required double longitude;" + + " }" + + " }" + + " }" + + "}", + rc -> { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("array", 0); // start writing array contents + + // write a non-null element + rc.startGroup(); // array level + rc.startField("element", 0); + rc.startGroup(); + rc.startField("latitude", 0); + rc.addDouble(0.0); + rc.endField("latitude", 0); + rc.startField("longitude", 1); + rc.addDouble(180.0); + rc.endField("longitude", 1); + rc.endGroup(); + rc.endField("element", 0); + rc.endGroup(); // array level + + rc.endField("array", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + }); + + Schema location = record( + "element", + field("latitude", primitive(Schema.Type.DOUBLE)), + field("longitude", primitive(Schema.Type.DOUBLE))); + + Schema newSchema = + record("MessageWithInvalidArraySchema", optionalField("locations", array(optional(location)))); + GenericRecord newRecord = instance( + newSchema, + "locations", + Arrays.asList( + instance(location, "latitude", 0.0, "longitude", 180.0), + instance(location, "latitude", 0.0, "longitude", 0.0))); + + Configuration oldConfWithSchema = new Configuration(); + AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema); + + AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(oldConfWithSchema, test); + Assert.assertThrows(InvalidRecordException.class, reader::read); + } + public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(Path path) throws IOException { return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path); } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java index c7874fe53..2355847f0 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetReader; @@ -232,6 +233,44 @@ public class TestSpecificReadWrite { } } + @Test + public void testRepeatedRecordProjection() throws IOException { + Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false); + Configuration conf = new Configuration(testConf); + Schema schema = Car.getClassSchema(); + + // Project a single field from repeated record schema + final Schema projectedSchema = SchemaBuilder.builder(schema.getNamespace()) + .record("Car") + .fields() + .name("serviceHistory") + .type(SchemaBuilder.unionOf() + .nullBuilder() + .endNull() + .and() + .array() + .items(SchemaBuilder.builder(schema.getNamespace()) + .record("Service") + .fields() + .requiredString("mechanic") + .endRecord()) + .endUnion()) + .noDefault() + .endRecord(); + + AvroReadSupport.setRequestedProjection(conf, projectedSchema); + + try (ParquetReader<Car> reader = new AvroParquetReader<>(conf, path)) { + for (Car car = reader.read(); car != null; car = reader.read()) { + assertNotNull(car.getServiceHistory()); + car.getServiceHistory().forEach(service -> { + assertNotNull(service.getMechanic()); + assertEquals(0L, service.getDate()); + }); + } + } + } + @Test public void testAvroReadSchema() throws IOException { Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, false);