This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 30fa1de0b PARQUET-2450: Fix Avro projection for single-field repeated 
record types (#1300)
30fa1de0b is described below

commit 30fa1de0b43037ce451063ea75570663fc9c663c
Author: Claire McGinty <clai...@spotify.com>
AuthorDate: Mon Mar 18 21:57:51 2024 -0400

    PARQUET-2450: Fix Avro projection for single-field repeated record types 
(#1300)
---
 .../apache/parquet/avro/AvroRecordConverter.java   |   1 +
 .../parquet/avro/TestArrayCompatibility.java       | 130 ++++++++++++++++++++-
 .../apache/parquet/avro/TestSpecificReadWrite.java |  39 +++++++
 3 files changed, 168 insertions(+), 2 deletions(-)

diff --git 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 87325a0af..62d1f89fd 100644
--- 
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ 
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -930,6 +930,7 @@ class AvroRecordConverter<T> extends 
AvroConverters.AvroGroupConverter {
   static boolean isElementType(Type repeatedType, Schema elementSchema) {
     if (repeatedType.isPrimitive()
         || repeatedType.asGroupType().getFieldCount() > 1
+        || repeatedType.getName().equals("array")
         || repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
       // The repeated type must be the element type because it is an invalid
       // synthetic wrapper. Must be a group with one optional or required field
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
index b4b543340..fd4cf2011 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
@@ -37,7 +37,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.DirectWriterTest;
 import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -603,7 +605,7 @@ public class TestArrayCompatibility extends 
DirectWriterTest {
   public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
     Path test = writeDirect(
         "message AvroCompatOptionalGroupInListWithSchema {" + "  optional 
group locations (LIST) {"
-            + "    repeated group array {"
+            + "    repeated group my_list {"
             + "      optional group element {"
             + "        required double latitude;"
             + "        required double longitude;"
@@ -616,7 +618,7 @@ public class TestArrayCompatibility extends 
DirectWriterTest {
           rc.startField("locations", 0);
 
           rc.startGroup();
-          rc.startField("array", 0); // start writing array contents
+          rc.startField("my_list", 0); // start writing array contents
 
           // write a non-null element
           rc.startGroup(); // array level
@@ -1103,6 +1105,130 @@ public class TestArrayCompatibility extends 
DirectWriterTest {
     assertReaderContains(newBehaviorReader(test, newDoubleSchema), 
newDoubleSchema, newDoubleRecord);
   }
 
+  @Test
+  public void testIsElementTypeRequiredRepeatedRecord() {
+    // Test `_tuple` style naming
+    MessageType parquetSchema = MessageTypeParser.parseMessageType("message 
SchemaWithList {\n"
+        + "  required group list_field (LIST) {\n"
+        + "    repeated group list_field_tuple (LIST) {\n"
+        + "      repeated int32 int_field;\n"
+        + "    }\n"
+        + "  }\n"
+        + "}");
+    Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+
+    Assert.assertTrue(AvroRecordConverter.isElementType(
+        
parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
+        avroSchema.getFields().get(0).schema()));
+
+    // Test `array` style naming
+    parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList 
{\n"
+        + "  required group list_field (LIST) {\n"
+        + "    repeated group array {\n"
+        + "      required int32 a;\n"
+        + "    }\n"
+        + "  }\n"
+        + "}");
+    avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+
+    Assert.assertTrue(AvroRecordConverter.isElementType(
+        parquetSchema.getType("list_field"),
+        avroSchema.getFields().get(0).schema()));
+  }
+
+  @Test
+  public void testIsElementTypeOptionalRepeatedRecord() {
+    // Test `_tuple` style naming
+    MessageType parquetSchema = MessageTypeParser.parseMessageType("message 
SchemaWithList {\n"
+        + "  optional group list_field (LIST) {\n"
+        + "    repeated group list_field_tuple (LIST) {\n"
+        + "      repeated int32 int_field;\n"
+        + "    }\n"
+        + "  }\n"
+        + "}");
+    Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+
+    Assert.assertTrue(AvroRecordConverter.isElementType(
+        
parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
+        avroSchema.getFields().get(0).schema()));
+
+    // Test `array` style naming
+    parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList 
{\n"
+        + "  optional group list_field (LIST) {\n"
+        + "    repeated group array {\n"
+        + "      required int32 a;\n"
+        + "    }\n"
+        + "  }\n"
+        + "}");
+    avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+
+    Assert.assertTrue(AvroRecordConverter.isElementType(
+        parquetSchema.getType("list_field"),
+        avroSchema.getFields().get(0).schema()));
+  }
+
+  @Test
+  public void testIsElementTypeFailsInvalidSchema() throws Exception {
+    Path test = writeDirect(
+        "message MessageWithInvalidArraySchema {"
+            + "  optional group locations (LIST) {"
+            + "    repeated group array {"
+            + "      optional group element {"
+            + "        required double latitude;"
+            + "        required double longitude;"
+            + "      }"
+            + "    }"
+            + "  }"
+            + "}",
+        rc -> {
+          rc.startMessage();
+          rc.startField("locations", 0);
+
+          rc.startGroup();
+          rc.startField("array", 0); // start writing array contents
+
+          // write a non-null element
+          rc.startGroup(); // array level
+          rc.startField("element", 0);
+          rc.startGroup();
+          rc.startField("latitude", 0);
+          rc.addDouble(0.0);
+          rc.endField("latitude", 0);
+          rc.startField("longitude", 1);
+          rc.addDouble(180.0);
+          rc.endField("longitude", 1);
+          rc.endGroup();
+          rc.endField("element", 0);
+          rc.endGroup(); // array level
+
+          rc.endField("array", 0); // finished writing array contents
+          rc.endGroup();
+
+          rc.endField("locations", 0);
+          rc.endMessage();
+        });
+
+    Schema location = record(
+        "element",
+        field("latitude", primitive(Schema.Type.DOUBLE)),
+        field("longitude", primitive(Schema.Type.DOUBLE)));
+
+    Schema newSchema =
+        record("MessageWithInvalidArraySchema", optionalField("locations", 
array(optional(location))));
+    GenericRecord newRecord = instance(
+        newSchema,
+        "locations",
+        Arrays.asList(
+            instance(location, "latitude", 0.0, "longitude", 180.0),
+            instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+    Configuration oldConfWithSchema = new Configuration();
+    AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);
+
+    AvroParquetReader<GenericRecord> reader = new 
AvroParquetReader<>(oldConfWithSchema, test);
+    Assert.assertThrows(InvalidRecordException.class, reader::read);
+  }
+
   public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(Path 
path) throws IOException {
     return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path);
   }
diff --git 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
index c7874fe53..2355847f0 100644
--- 
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
+++ 
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetReader;
@@ -232,6 +233,44 @@ public class TestSpecificReadWrite {
     }
   }
 
+  @Test
+  public void testRepeatedRecordProjection() throws IOException {
+    Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, 
false);
+    Configuration conf = new Configuration(testConf);
+    Schema schema = Car.getClassSchema();
+
+    // Project a single field from repeated record schema
+    final Schema projectedSchema = SchemaBuilder.builder(schema.getNamespace())
+        .record("Car")
+        .fields()
+        .name("serviceHistory")
+        .type(SchemaBuilder.unionOf()
+            .nullBuilder()
+            .endNull()
+            .and()
+            .array()
+            .items(SchemaBuilder.builder(schema.getNamespace())
+                .record("Service")
+                .fields()
+                .requiredString("mechanic")
+                .endRecord())
+            .endUnion())
+        .noDefault()
+        .endRecord();
+
+    AvroReadSupport.setRequestedProjection(conf, projectedSchema);
+
+    try (ParquetReader<Car> reader = new AvroParquetReader<>(conf, path)) {
+      for (Car car = reader.read(); car != null; car = reader.read()) {
+        assertNotNull(car.getServiceHistory());
+        car.getServiceHistory().forEach(service -> {
+          assertNotNull(service.getMechanic());
+          assertEquals(0L, service.getDate());
+        });
+      }
+    }
+  }
+
   @Test
   public void testAvroReadSchema() throws IOException {
     Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED, 
false);

Reply via email to