[ 
https://issues.apache.org/jira/browse/PARQUET-1254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362013#comment-17362013
 ] 

Andreas Hailu edited comment on PARQUET-1254 at 6/11/21, 8:15 PM:
------------------------------------------------------------------

Hi, I came across this ticket because I believe we are blocked by something 
very similar in AvroIndexedRecordConverter - we're using Apache Flink to read 
Parquet parquet files given an Avro schema.

What I see in Bob's example is the _elementWrapper_ field has its name as 
_array_ in AvroRecordConverter, resulting in the converted Avro schema being 
interpreted as incompatible as there's no way to resolve the 2 schemata. The 
resulting check for schema compatibility in 
A\{{vroRecordConverter#isElementType}} {{checkReaderWriterCompatibility}} 
returns false, and an {{ElementConverter}} is created instead of a 
{{AvroRecordConverter}}.

What the converted Parquet schema from given Avro schema results in:

 
{code:java}
message record {
 required group elements (LIST) {
   repeated group array {
     required group array_element {
       required int32 someField;
    }
   }
 }
}{code}
 

What it likely should be to generate a compatible Avro schema for comparison:

 
{code:java}
message record {
 required group elements (LIST) {
   repeated group elementWrapper {
     required group array_element {
       required int32 someField;
     }
   }
 }
}{code}
I am able to read the file as expected if I force the 
{{AvroRecordConverter#isElementType}} schema compatibility condition to return 
{{true}}. It returns false as it fails the schema compatibility check because 
_elementWrapper_ is replaced with _array_.

 


was (Author: ahailu):
Hi, I came across this ticket because I believe we are blocked by something 
very similar in AvroIndexedRecordConverter - we're using Apache Flink to read 
Parquet parquet files given an Avro schema.

What I see in Bob's example is the _elementWrapper_ field has its name as 
_array_ in AvroRecordConverter, resulting in the converted Avro schema being 
interpreted as incompatible as there's no way to resolve the 2 schemata. The 
resulting check for schema compatibility in 
A{{vroRecordConverter#isElementType}} {{checkReaderWriterCompatibility}} 
returns false, and an {{ElementConverter}} is created instead of a 
{{AvroRecordConverter}}.

What the converted Parquet schema from given Avro schema results in:

 
{code:java}
message record {
 required group elements (LIST) {
   repeated group array {
     required group array_element {
       required int32 someField;
    }
   }
 }
}{code}
 

What it likely should be to generate a compatible Avro schema for comparison:

 
{code:java}
message record {
 required group elements (LIST) {
   repeated group elementWrapper {
     required group array_element {
       required int32 someField;
     }
     }
 }
}{code}
I am able to read the file as expected if I force the 
{{AvroRecordConverter#isElementType}} schema compatibility condition to return 
{{true}}. It returns false as it fails the schema compatibility check because 
_elementWrapper_ is replaced with _array_.

 

> Unable to read deeply nested records from Parquet file with Avro interface.
> ---------------------------------------------------------------------------
>
>                 Key: PARQUET-1254
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1254
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-avro, parquet-mr
>    Affects Versions: 1.8.1
>            Reporter: Bob smith
>            Priority: Major
>
> I am attempting to read Parquet data, whose schema contains a record nested 
> in a wrapper record, which is also nested in an array. E.g:
> {code:java}
> {
>   "type": "record",
>   "name": "record",
>   "fields": [
>     {
>       "name": "elements",
>       "type": {
>         "type": "array",
>         "items": {
>           "type": "record",
>           "name": "elementWrapper",
>           "fields": [
>             {
>               "name": "array_element",
>               "type": {
>                 "type": "record",
>                 "name": "element",
>                 "namespace": "test",
>                 "fields": [
>                   {
>                     "name": "someField",
>                     "type": "int"
>                   }
>                 ]
>               }
>             }
>           ]
>         }
>       }
>     }
>   ]
> }
> {code}
> When reading a parquet file with the above schema using the 
> {{ParquetFileReader}}, I can see the file has the following schema, which 
> appears to be correct:
> {code:java}
> message record {
>   required group elements (LIST) {
>     repeated group array {
>       required group array_element {
>         required int32 someField;
>       }
>     }
>   }
> }
> {code}
> However, when attempting to read records from this file with the Avro 
> interface (see below), I get a {{InvalidRecordException}}.
> {code:java}
> final ParquetReader<GenericRecord> parquetReader = 
> AvroParquetReader.<GenericRecord>builder(path).build();
> final GenericRecord read = parquetReader.read();
> {code}
> Stepping through the code, it looks like when the record is converted to 
> Avro, the field "someField" isn't in scope. Only fields at the top level of 
> the schema are in scope.
> Is it expected that Avro Parquet does not support this schema? Is this a bug 
> in the AvroRecordConverter?
> Thanks, Iain
> Stacktrace:
> {code:java}
> org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: 
> Avro field 'someField' not found
>       at 
> org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:220)
>       at 
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:125)
>       at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:274)
>       at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:227)
>       at 
> org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:73)
>       at 
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:531)
>       at 
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:481)
>       at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
>       at 
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:136)
>       at 
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:90)
>       at 
> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
>       at 
> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:132)
>       at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:175)
>       at 
> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:149)
>       at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
> {code}
> Below is the full code that creates a Parquet file with this schema, and then 
> fails to read it:
> {code:java}
>     @Test
>     @SneakyThrows
>     public void canReadWithNestedArray() {
>         final Path path = new Path("test-resources/" + UUID.randomUUID());
>         // Construct a record that defines the final nested value we can't 
> read
>         final Schema element = Schema.createRecord("element", null, "test", 
> false);
>         element.setFields(Arrays.asList(new Schema.Field("someField", 
> Schema.create(Schema.Type.INT), null, null)));
>         // Create a wrapper for above nested record
>         final Schema elementWrapper = Schema.createRecord("elementWrapper", 
> null, null, false);
>         elementWrapper.setFields(Arrays.asList(new 
> Schema.Field("array_element", element, null, null)));
>         // Create top level field that contains array of wrapped records
>         final Schema.Field topLevelArrayOfWrappers = new 
> Schema.Field("elements", Schema.createArray(elementWrapper), null, null);
>         final Schema topLevelElement = Schema.createRecord("record", null, 
> null, false);
>         topLevelElement.setFields(Arrays.asList(topLevelArrayOfWrappers));
>         final GenericRecord genericRecord = new 
> GenericData.Record(topLevelElement);
>         // Create element
>         final GenericData.Record recordValue = new 
> GenericData.Record(element);
>         recordValue.put("someField", 5);
>         // Create element of array, wrapper containing above element
>         final GenericData.Record wrapperValue = new 
> GenericData.Record(elementWrapper);
>         wrapperValue.put("array_element", recordValue);
>         genericRecord.put(topLevelArrayOfWrappers.name(), 
> Arrays.asList(wrapperValue));
>         
>         AvroParquetWriter.Builder<GenericRecord> fileWriterBuilder = 
> AvroParquetWriter.<GenericRecord>builder(path).withSchema(topLevelElement);
>         final ParquetWriter<GenericRecord> fileWriter = 
> fileWriterBuilder.build();
>         fileWriter.write(genericRecord);
>         fileWriter.close();
>         final ParquetFileReader parquetFileReader = 
> ParquetFileReader.open(new Configuration(), path);
>         final FileMetaData fileMetaData = parquetFileReader.getFileMetaData();
>         System.out.println(fileMetaData.getSchema().toString());
>         final ParquetReader<GenericRecord> parquetReader = 
> AvroParquetReader.<GenericRecord>builder(path).build();
>         final GenericRecord read = parquetReader.read();
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to