[ https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503603#comment-17503603 ]
Jing Ge commented on FLINK-26349: --------------------------------- The user schema in AvroParquetRecordFormatTest is defined only for Avro GenericRecord. In order to make it support ReflectData read, a namespace is required, so that the program could find the class to do reflection. I have updated the user schema and add one UT to cover this case. Thanks for pointing out it. ------------------------ FYI ---------------------------- parquet file created by the user schema has the following meta: {code:java} creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94) extra: parquet.avro.schema = {"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]} extra: writer.model.name = avro file schema: User -------------------------------------------------------------------------------- name: REQUIRED BINARY L:STRING R:0 D:0 favoriteNumber: OPTIONAL INT32 R:0 D:1 favoriteColor: OPTIONAL BINARY L:STRING R:0 D:1 row group 1: RC:3 TS:143 OFFSET:4 -------------------------------------------------------------------------------- name: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0] favoriteNumber: INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0] favoriteColor: BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0] {code} parquet file created by Datum POJO class has the following meta: {code:java} creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94) extra: parquet.avro.schema = {"type":"record","name":"Datum","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]} extra: writer.model.name = avro file schema: org.apache.flink.formats.parquet.avro.Datum -------------------------------------------------------------------------------- a: REQUIRED BINARY L:STRING R:0 D:0 b: REQUIRED INT32 R:0 D:0 row group 1: RC:3 TS:73 OFFSET:4 -------------------------------------------------------------------------------- a: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:38/38/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: a, max: c, num_nulls: 0] b: INT32 UNCOMPRESSED DO:0 FPO:42 SZ:35/35/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0] {code} > AvroParquetReaders does not work with ReflectData > ------------------------------------------------- > > Key: FLINK-26349 > URL: https://issues.apache.org/jira/browse/FLINK-26349 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.15.0 > Reporter: Dawid Wysakowicz > Assignee: Jing Ge > Priority: Critical > Fix For: 1.15.0 > > > I tried to change the {{AvroParquetFileReadITCase}} to read the data as > {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact > same code for writing parquet files, but changed the reading part to: > {code} > public static final class User { > private final String name; > private final Integer favoriteNumber; > private final String favoriteColor; > public User(String name, Integer favoriteNumber, String > favoriteColor) { > this.name = name; > this.favoriteNumber = favoriteNumber; > this.favoriteColor = favoriteColor; > } > } > final FileSource<User> source = > FileSource.forRecordStreamFormat( > > AvroParquetReaders.forReflectRecord(User.class), > > Path.fromLocalFile(TEMPORARY_FOLDER.getRoot())) > .monitorContinuously(Duration.ofMillis(5)) > .build(); > {code} > I get an error: > {code} > 819020 [flink-akka.actor.default-dispatcher-9] DEBUG > org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure > causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail: > com.esotericsoftware.kryo.KryoException: > java.lang.UnsupportedOperationException > Serialization trace: > reserved (org.apache.avro.Schema$Field) > fieldMap (org.apache.avro.Schema$RecordSchema) > schema (org.apache.avro.generic.GenericData$Record) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) > at > com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191) > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53) > at > org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93) > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.UnsupportedOperationException > at > java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ... 30 more > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)