[ 
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503603#comment-17503603
 ] 

Jing Ge edited comment on FLINK-26349 at 3/9/22, 2:04 PM:
----------------------------------------------------------

The user schema in AvroParquetRecordFormatTest is defined only for Avro 
GenericRecord. In order to make it support ReflectData read, a namespace is 
required, so that the program could find the class to do reflection.

I have updated the user schema and add one UT to cover this case. Thanks for 
pointing it out.

------------------------ FYI ----------------------------
parquet file created by the user schema has the following meta:
{code:java}
creator:        parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:          parquet.avro.schema = 
{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra:          writer.model.name = avro

file schema:  User
--------------------------------------------------------------------------------
name:           REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1

row group 1:    RC:3 TS:143 OFFSET:4
--------------------------------------------------------------------------------
name:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
{code}
parquet file created by Datum POJO class has the following meta:
{code:java}
creator:     parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:       parquet.avro.schema = 
{"type":"record","name":"Datum","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}
extra:       writer.model.name = avro

file schema: org.apache.flink.formats.parquet.avro.Datum
--------------------------------------------------------------------------------
a:           REQUIRED BINARY L:STRING R:0 D:0
b:           REQUIRED INT32 R:0 D:0

row group 1: RC:3 TS:73 OFFSET:4
--------------------------------------------------------------------------------
a:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:38/38/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: a, max: c, num_nulls: 0]
b:            INT32 UNCOMPRESSED DO:0 FPO:42 SZ:35/35/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
{code}


was (Author: jingge):
The user schema in AvroParquetRecordFormatTest is defined only for Avro 
GenericRecord. In order to make it support ReflectData read, a namespace is 
required, so that the program could find the class to do reflection. 

I have updated the user schema and add one UT to cover this case. Thanks for 
pointing out it.

------------------------ FYI ----------------------------
parquet file created by the user schema has the following meta:


{code:java}
creator:        parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:          parquet.avro.schema = 
{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra:          writer.model.name = avro

file schema:  User
--------------------------------------------------------------------------------
name:           REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1

row group 1:    RC:3 TS:143 OFFSET:4
--------------------------------------------------------------------------------
name:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
{code}

parquet file created by Datum POJO class has the following meta:

{code:java}
creator:     parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:       parquet.avro.schema = 
{"type":"record","name":"Datum","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}
extra:       writer.model.name = avro

file schema: org.apache.flink.formats.parquet.avro.Datum
--------------------------------------------------------------------------------
a:           REQUIRED BINARY L:STRING R:0 D:0
b:           REQUIRED INT32 R:0 D:0

row group 1: RC:3 TS:73 OFFSET:4
--------------------------------------------------------------------------------
a:            BINARY UNCOMPRESSED DO:0 FPO:4 SZ:38/38/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: a, max: c, num_nulls: 0]
b:            INT32 UNCOMPRESSED DO:0 FPO:42 SZ:35/35/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
{code}



> AvroParquetReaders does not work with ReflectData
> -------------------------------------------------
>
>                 Key: FLINK-26349
>                 URL: https://issues.apache.org/jira/browse/FLINK-26349
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.15.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Jing Ge
>            Priority: Critical
>             Fix For: 1.15.0
>
>
> I tried to change the {{AvroParquetFileReadITCase}} to read the data as 
> {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact 
> same code for writing parquet files, but changed the reading part to:
> {code}
>     public static final class User {
>         private final String name;
>         private final Integer favoriteNumber;
>         private final String favoriteColor;
>         public User(String name, Integer favoriteNumber, String 
> favoriteColor) {
>             this.name = name;
>             this.favoriteNumber = favoriteNumber;
>             this.favoriteColor = favoriteColor;
>         }
>     }
>         final FileSource<User> source =
>                 FileSource.forRecordStreamFormat(
>                                 
> AvroParquetReaders.forReflectRecord(User.class),
>                                 
> Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
>                         .monitorContinuously(Duration.ofMillis(5))
>                         .build();
> {code}
> I get an error:
> {code}
> 819020 [flink-akka.actor.default-dispatcher-9] DEBUG 
> org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure 
> causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail: 
> com.esotericsoftware.kryo.KryoException: 
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>       at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>       at 
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       ... 30 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to