[ 
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497992#comment-17497992
 ] 

Dawid Wysakowicz edited comment on FLINK-26349 at 2/25/22, 9:00 AM:
--------------------------------------------------------------------

Yes, and why is that not supported? In the end, the binary format on disk is 
parquet. So what is the problem? 

Even pure avro, does not care if data has been written with GenericData or 
Reflect or Specific as long as the schema is the same.


was (Author: dawidwys):
Yes, and why is that not supported? 

> AvroParquetReaders does not work with ReflectData
> -------------------------------------------------
>
>                 Key: FLINK-26349
>                 URL: https://issues.apache.org/jira/browse/FLINK-26349
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.15.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Jing Ge
>            Priority: Critical
>             Fix For: 1.15.0
>
>
> I tried to change the {{AvroParquetFileReadITCase}} to read the data as 
> {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact 
> same code for writing parquet files, but changed the reading part to:
> {code}
>     public static final class User {
>         private final String name;
>         private final Integer favoriteNumber;
>         private final String favoriteColor;
>         public User(String name, Integer favoriteNumber, String 
> favoriteColor) {
>             this.name = name;
>             this.favoriteNumber = favoriteNumber;
>             this.favoriteColor = favoriteColor;
>         }
>     }
>         final FileSource<User> source =
>                 FileSource.forRecordStreamFormat(
>                                 
> AvroParquetReaders.forReflectRecord(User.class),
>                                 
> Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
>                         .monitorContinuously(Duration.ofMillis(5))
>                         .build();
> {code}
> I get an error:
> {code}
> 819020 [flink-akka.actor.default-dispatcher-9] DEBUG 
> org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure 
> causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail: 
> com.esotericsoftware.kryo.KryoException: 
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>       at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>       at 
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       ... 30 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to