[ https://issues.apache.org/jira/browse/FLINK-18478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163801#comment-17163801 ]
Aljoscha Krettek edited comment on FLINK-18478 at 7/24/20, 8:15 AM: -------------------------------------------------------------------- This is a simpler reproducer: {code:java} object AvroBug { def main(args: Array[String]): Unit = { val deserSchema = AvroDeserializationSchema.forSpecific(classOf[Tweet]) val serSchema = AvroSerializationSchema.forSpecific(classOf[Tweet]) val tweet = Tweet(Some("a")) val serializedTweet = serSchema.serialize(tweet) val deserializedTweet: Tweet = deserSchema.deserialize(serializedTweet) println(s"Tweet: $deserializedTweet") } } /** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */ import scala.annotation.switch /** * Twitter tweet record limited to basic information * @param tweet_id System-assigned numeric tweet ID. Cannot be changed by the user. */ final case class Tweet(var tweet_id: Option[String]) extends org.apache.avro.specific.SpecificRecordBase { def this() = this(None) def get(field$: Int): AnyRef = { (field$: @switch) match { case 0 => { tweet_id match { case Some(x) => x case None => null } }.asInstanceOf[AnyRef] case _ => new org.apache.avro.AvroRuntimeException("Bad index") } } def put(field$: Int, value: Any): Unit = { (field$: @switch) match { case 0 => this.tweet_id = { value match { case null => None case _ => Some(value.toString) } }.asInstanceOf[Option[String]] case _ => new org.apache.avro.AvroRuntimeException("Bad index") } () } def getSchema: org.apache.avro.Schema = Tweet.SCHEMA$ } object Tweet { val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Tweet\",\"namespace\":\"com.github.geoheil.streamingreference\",\"doc\":\"Twitter tweet record limited to basic information\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"],\"doc\":\"System-assigned numeric tweet ID. Cannot be changed by the user.\"}]}") } {code} The problem is that Avro will deserialize the Tweet as a {{GenericData$Record}}, [~dwysakowicz] do you know if this is the expected behaviour here? Also, I found another problem while working on the reproducer: FLINK-18692. The example only works when this is fixed, by changing {{checkAvroInitialized()}} in there to use {code:java} SpecificData specificData = new SpecificData(cl); Schema schema = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData); {code} was (Author: aljoscha): This is a simpler reproducer: {code} object AvroBug { def main(args: Array[String]): Unit = { val deserSchema = AvroDeserializationSchema.forSpecific(classOf[Tweet]) val serSchema = AvroSerializationSchema.forSpecific(classOf[Tweet]) val tweet = Tweet(Some("a")) val serializedTweet = serSchema.serialize(tweet) val deserializedTweet: Tweet = deserSchema.deserialize(serializedTweet) println(s"Tweet: $deserializedTweet") } } /** MACHINE-GENERATED FROM AVRO SCHEMA. DO NOT EDIT DIRECTLY */ import scala.annotation.switch /** * Twitter tweet record limited to basic information * @param tweet_id System-assigned numeric tweet ID. Cannot be changed by the user. */ final case class Tweet(var tweet_id: Option[String]) extends org.apache.avro.specific.SpecificRecordBase { def this() = this(None) def get(field$: Int): AnyRef = { (field$: @switch) match { case 0 => { tweet_id match { case Some(x) => x case None => null } }.asInstanceOf[AnyRef] case _ => new org.apache.avro.AvroRuntimeException("Bad index") } } def put(field$: Int, value: Any): Unit = { (field$: @switch) match { case 0 => this.tweet_id = { value match { case null => None case _ => Some(value.toString) } }.asInstanceOf[Option[String]] case _ => new org.apache.avro.AvroRuntimeException("Bad index") } () } def getSchema: org.apache.avro.Schema = Tweet.SCHEMA$ } object Tweet { val SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Tweet\",\"namespace\":\"com.github.geoheil.streamingreference\",\"doc\":\"Twitter tweet record limited to basic information\",\"fields\":[{\"name\":\"tweet_id\",\"type\":[\"null\",\"string\"],\"doc\":\"System-assigned numeric tweet ID. Cannot be changed by the user.\"}]}") } {code} The problem is that Avro will deserialize the Tweet as a {{GenericData$Record}}, [~dwysakowicz] do you know if this is the expected behaviour here? Also, I found another problem while working on the reproducer: FLINK-18693. The example only works when this is fixed, by changing {{checkAvroInitialized()}} in there to use {code} SpecificData specificData = new SpecificData(cl); Schema schema = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData); {code} > AvroDeserializationSchema does not work with types generated by avrohugger > -------------------------------------------------------------------------- > > Key: FLINK-18478 > URL: https://issues.apache.org/jira/browse/FLINK-18478 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Major > Labels: pull-request-available > Fix For: 1.10.2, 1.12.0, 1.11.1 > > > The main problem is that the code in {{SpecificData.createSchema()}} tries to > reflectively read the {{SCHEMA$}} field, that is normally there in Avro > generated classes. However, avrohugger generates this field in a companion > object, which the reflective Java code will therefore not find. > This is also described in these ML threads: > * > [https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E] > * > [https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)