[jira] [Comment Edited] (FLINK-26349) AvroParquetReaders does not work with ReflectData

2022-03-10 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503603#comment-17503603
 ] 

Jing Ge edited comment on FLINK-26349 at 3/10/22, 1:58 PM:
---

The user schema in AvroParquetRecordFormatTest is defined only for Avro 
GenericRecord. In order to make it support ReflectData read, a namespace is 
required, so that the program could find the class to do reflection.

I have updated the user schema and add test cases to cover this case. Thanks 
for pointing it out.

 FYI 
parquet file created by the user schema has the following meta:
{code:java}
creator:parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:  parquet.avro.schema = 
{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra:  writer.model.name = avro

file schema:  User

name:   REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1

row group 1:RC:3 TS:143 OFFSET:4

name:BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
{code}
parquet file created by Datum POJO class has the following meta:
{code:java}
creator: parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:   parquet.avro.schema = 
{"type":"record","name":"Datum","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}
extra:   writer.model.name = avro

file schema: org.apache.flink.formats.parquet.avro.Datum

a:   REQUIRED BINARY L:STRING R:0 D:0
b:   REQUIRED INT32 R:0 D:0

row group 1: RC:3 TS:73 OFFSET:4

a:BINARY UNCOMPRESSED DO:0 FPO:4 SZ:38/38/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: a, max: c, num_nulls: 0]
b:INT32 UNCOMPRESSED DO:0 FPO:42 SZ:35/35/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
{code}


was (Author: jingge):
The user schema in AvroParquetRecordFormatTest is defined only for Avro 
GenericRecord. In order to make it support ReflectData read, a namespace is 
required, so that the program could find the class to do reflection.

I have updated the user schema and add one UT to cover this case. Thanks for 
pointing it out.

 FYI 
parquet file created by the user schema has the following meta:
{code:java}
creator:parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:  parquet.avro.schema = 
{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra:  writer.model.name = avro

file schema:  User

name:   REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1

row group 1:RC:3 TS:143 OFFSET:4

name:BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
{code}
parquet file created by Datum POJO class has the following meta:
{code:java}
creator: parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:   parquet.avro.schema = 
{"type":"record","name":"Datum","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}
extra:   writer.model.name = avro

file schema: org.apache.flink.formats.parquet.avro.Datum

a:   REQUIRED BINARY L:STRING R:0 D:0
b:   REQUIRED 

[jira] [Comment Edited] (FLINK-26349) AvroParquetReaders does not work with ReflectData

2022-03-09 Thread Jing Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503603#comment-17503603
 ] 

Jing Ge edited comment on FLINK-26349 at 3/9/22, 2:04 PM:
--

The user schema in AvroParquetRecordFormatTest is defined only for Avro 
GenericRecord. In order to make it support ReflectData read, a namespace is 
required, so that the program could find the class to do reflection.

I have updated the user schema and add one UT to cover this case. Thanks for 
pointing it out.

 FYI 
parquet file created by the user schema has the following meta:
{code:java}
creator:parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:  parquet.avro.schema = 
{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra:  writer.model.name = avro

file schema:  User

name:   REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1

row group 1:RC:3 TS:143 OFFSET:4

name:BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
{code}
parquet file created by Datum POJO class has the following meta:
{code:java}
creator: parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:   parquet.avro.schema = 
{"type":"record","name":"Datum","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}
extra:   writer.model.name = avro

file schema: org.apache.flink.formats.parquet.avro.Datum

a:   REQUIRED BINARY L:STRING R:0 D:0
b:   REQUIRED INT32 R:0 D:0

row group 1: RC:3 TS:73 OFFSET:4

a:BINARY UNCOMPRESSED DO:0 FPO:4 SZ:38/38/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: a, max: c, num_nulls: 0]
b:INT32 UNCOMPRESSED DO:0 FPO:42 SZ:35/35/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
{code}


was (Author: jingge):
The user schema in AvroParquetRecordFormatTest is defined only for Avro 
GenericRecord. In order to make it support ReflectData read, a namespace is 
required, so that the program could find the class to do reflection. 

I have updated the user schema and add one UT to cover this case. Thanks for 
pointing out it.

 FYI 
parquet file created by the user schema has the following meta:


{code:java}
creator:parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:  parquet.avro.schema = 
{"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra:  writer.model.name = avro

file schema:  User

name:   REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor:  OPTIONAL BINARY L:STRING R:0 D:1

row group 1:RC:3 TS:143 OFFSET:4

name:BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 
ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber:  INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor:   BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 
ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
{code}

parquet file created by Datum POJO class has the following meta:

{code:java}
creator: parquet-mr version 1.12.2 (build 
77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra:   parquet.avro.schema = 
{"type":"record","name":"Datum","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"a","type":"string"},{"name":"b","type":"int"}]}
extra:   writer.model.name = avro

file schema: org.apache.flink.formats.parquet.avro.Datum

a:   REQUIRED BINARY L:STRING R:0 D:0
b:   REQUIRED I

[jira] [Comment Edited] (FLINK-26349) AvroParquetReaders does not work with ReflectData

2022-02-25 Thread Dawid Wysakowicz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497992#comment-17497992
 ] 

Dawid Wysakowicz edited comment on FLINK-26349 at 2/25/22, 9:00 AM:


Yes, and why is that not supported? In the end, the binary format on disk is 
parquet. So what is the problem? 

Even pure avro, does not care if data has been written with GenericData or 
Reflect or Specific as long as the schema is the same.


was (Author: dawidwys):
Yes, and why is that not supported? 

> AvroParquetReaders does not work with ReflectData
> -
>
> Key: FLINK-26349
> URL: https://issues.apache.org/jira/browse/FLINK-26349
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.15.0
>Reporter: Dawid Wysakowicz
>Assignee: Jing Ge
>Priority: Critical
> Fix For: 1.15.0
>
>
> I tried to change the {{AvroParquetFileReadITCase}} to read the data as 
> {{ReflectData}} and I stumbled on a problem. The scenario is that I use exact 
> same code for writing parquet files, but changed the reading part to:
> {code}
> public static final class User {
> private final String name;
> private final Integer favoriteNumber;
> private final String favoriteColor;
> public User(String name, Integer favoriteNumber, String 
> favoriteColor) {
> this.name = name;
> this.favoriteNumber = favoriteNumber;
> this.favoriteColor = favoriteColor;
> }
> }
> final FileSource source =
> FileSource.forRecordStreamFormat(
> 
> AvroParquetReaders.forReflectRecord(User.class),
> 
> Path.fromLocalFile(TEMPORARY_FOLDER.getRoot()))
> .monitorContinuously(Duration.ofMillis(5))
> .build();
> {code}
> I get an error:
> {code}
> 819020 [flink-akka.actor.default-dispatcher-9] DEBUG 
> org.apache.flink.runtime.jobmaster.JobMaster [] - Archive local failure 
> causing attempt cc9f5e814ea9a3a5b397018dbffcb6a9 to fail: 
> com.esotericsoftware.kryo.KryoException: 
> java.lang.UnsupportedOperationException
> Serialization trace:
> reserved (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:402)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>   at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
>   at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
>   at 
> org.ap