Hi, Thanks for the replies. I get that it is not wise to use GenericRecord and that is what is causing the Kryo fallback, but then if not this, how should I go about writing a AvroSchemaRegistrySchema for when I don’t know the schema. Without the knowledge of schema, I can’t create a class. Can you suggest a way of getting around that?
Thanks! > On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > > Hi Nitish, > > Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer > comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As > a GenericRecord is not a pojo this call will produce a GenericTypeInfo which > uses Kryo serialization. > > For a reference example I would recommend having a look at > AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for working > with GenericRecords. One important note. GenericRecords are not the best > candidates for a data objects in Flink. The reason is if you apply any > transformation on a GenericRecord e.g. map/flatMap. The input type > information cannot be forwarded as the transformation is a black box from > Flink's perspective. Therefore you would need to provide the type information > for every step of the pipeline: > > TypeInformation<?> info = ... > > sEnv.addSource(...) // produces info > > .map(...) > > .returns(info) // must be provided again, as the map transformation is a > black box, the transformation might produce a completely different record > > Hope that helps a bit. > > Best, > > Dawid > > On 02/03/2020 09:04, Arvid Heise wrote: >> Hi Nitish, >> >> Kryo is the fallback serializer of Flink when everything else fails. In >> general, performance suffers quite a bit and it's not always applicable as >> in your case. Especially, in production code, it's best to avoid it >> completely. >> >> In your case, the issue is that your provided type information is completely >> meaningless. getProducedType is not providing any actual type information >> but just references to a generic skeleton. Flink uses the type information >> to reason about the value structures, which it cannot in your case. >> >> If you really need to resort to a completely generic serializer (which is >> usually not needed), then you have a few options: >> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that >> generic you probably have only a simple transformation before outputting it >> into some generic Kafka sink. So your UDF deserializes, does some generic >> stuff, and immediately turns it back into byte[]. >> * Implement your own generic TypeInformation with serializer. >> WritableTypeInfo [1] is a generic example on how to do it. This will >> automatically convert byte[] back and forth to GenericRecord. That would be >> the recommended way when you have multiple transformations before source and >> sink. >> >> [1] >> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java >> >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java> >> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com >> <mailto:nitishpant...@gmail.com>> wrote: >> Hi all, >> >> I am trying to work with flink to get avro data from kafka for which the >> schemas are stored in kafka schema registry. Since, the producer for kafka >> is a totally different service(an MQTT consumer sinked to kafka), I can’t >> have the schema with me at the consumer end. I read around and diverged to >> the following implementation of KeyedDeserializationSchema but I cannot >> understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: >> java.lang.NullPointerException` >> >> class AvroDeserializationSchema(schemaRegistryUrl: String) extends >> KeyedDeserializationSchema[GenericRecord] { >> >> // Flink needs the serializer to be serializable => this "@transient lazy >> val" does the trick >> @transient lazy val valueDeserializer = { >> val deserializer = new KafkaAvroDeserializer(new >> CachedSchemaRegistryClient(schemaRegistryUrl, >> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)) >> deserializer.configure( >> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >> schemaRegistryUrl, >> KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> >> false).asJava, >> false) >> deserializer >> } >> >> override def isEndOfStream(nextElement: GenericRecord): Boolean = false >> >> override def deserialize(messageKey: Array[Byte], message: Array[Byte], >> topic: String, partition: Int, offset: Long): GenericRecord = { >> >> // val key = keyDeserializer.deserialize(topic, >> messageKey).asInstanceOf[String] >> val value = valueDeserializer.deserialize(topic, >> message).asInstanceOf[GenericRecord] >> >> value >> } >> >> override def getProducedType: TypeInformation[GenericRecord] = >> TypeExtractor.getForClass(classOf[GenericRecord]) >> } >> >> I have no clue how to go about solving this. I saw a lot of people trying to >> implement the same. If someone can guide me, it’d be really helpful. >> >> Thanks! >> Nitish