[ 
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497992#comment-17497992
 ] 

Dawid Wysakowicz commented on FLINK-26349:
------------------------------------------

Yes, and why is that not supported? 

> AvroParquetReaders does not work with ReflectData
> -------------------------------------------------
>
>                 Key: FLINK-26349
>                 URL: https://issues.apache.org/jira/browse/FLINK-26349
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.15.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Jing Ge
>            Priority: Critical
>             Fix For: 1.15.0
>
>
> I tried to change the {{AvroParquetFileReadITCase}} to read the data as 
> {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact 
> same code for writing parquet files, but changed the reading part to:
> {code}
>     public static final class User {
>         private final String name;
>         private final Integer favoriteNumber;
>         private final String favoriteColor;
>         public User(String name, Integer favoriteNumber, String 
> favoriteColor) {
>             this.name = name;
>             this.favoriteNumber = favoriteNumber;
>             this.favoriteColor = favoriteColor;
>         }
>     }
>         final FileSource<User> source =
>                 FileSource.forRecordStreamFormat(
>                                 
> AvroParquetReaders.forReflectRecord(User.class),
>                                 
> Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
>                         .monitorContinuously(Duration.ofMillis(5))
>                         .build();
> {code}
> I get an error:
> {code}
> 819020 [flink-akka.actor.default-dispatcher-9] DEBUG 
> org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure 
> causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail: 
> com.esotericsoftware.kryo.KryoException: 
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>       at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>       at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>       at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.UnsupportedOperationException
>       at 
> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>       at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>       at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>       at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>       ... 30 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to