Hi,

Thanks for the replies. I get that it is not wise to use GenericRecord and that 
is what is causing the Kryo fallback, but then if not this, how should I go 
about writing a AvroSchemaRegistrySchema for when I don’t know the schema. 
Without the knowledge of schema, I can’t create a class. Can you suggest a way 
of getting around that?

Thanks!

> On 02-Mar-2020, at 2:14 PM, Dawid Wysakowicz <dwysakow...@apache.org> wrote:
> 
> Hi Nitish,
> 
> Just to slightly extend on Arvid's reply. As Arvid said the Kryo serializer 
> comes from the call to TypeExtractor.getForClass(classOf[GenericRecord]). As 
> a GenericRecord is not a pojo this call will produce a GenericTypeInfo which 
> uses Kryo serialization.
> 
> For a reference example I would recommend having a look at 
> AvroDeserializationSchema. There we use GenericRecordAvroTypeInfo for working 
> with GenericRecords. One important note. GenericRecords are not the best 
> candidates for a data objects in Flink. The reason is if you apply any 
> transformation on a GenericRecord e.g. map/flatMap. The input type 
> information cannot be forwarded as the transformation is a black box from 
> Flink's perspective. Therefore you would need to provide the type information 
> for every step of the pipeline:
> 
> TypeInformation<?> info = ...
> 
> sEnv.addSource(...) // produces info
> 
> .map(...)
> 
> .returns(info) // must be provided again, as the map transformation is a 
> black box, the transformation might produce a completely different record
> 
> Hope that helps a bit.
> 
> Best,
> 
> Dawid
> 
> On 02/03/2020 09:04, Arvid Heise wrote:
>> Hi Nitish,
>> 
>> Kryo is the fallback serializer of Flink when everything else fails. In 
>> general, performance suffers quite a bit and it's not always applicable as 
>> in your case. Especially, in production code, it's best to avoid it 
>> completely.
>> 
>> In your case, the issue is that your provided type information is completely 
>> meaningless. getProducedType is not providing any actual type information 
>> but just references to a generic skeleton. Flink uses the type information 
>> to reason about the value structures, which it cannot in your case.
>> 
>> If you really need to resort to a completely generic serializer (which is 
>> usually not needed), then you have a few options:
>> * Easiest, stick to byte[] and convert in a downstream UDF. If it's that 
>> generic you probably have only a simple transformation before outputting it 
>> into some generic Kafka sink. So your UDF deserializes, does some generic 
>> stuff, and immediately turns it back into byte[].
>> * Implement your own generic TypeInformation with serializer. 
>> WritableTypeInfo [1] is a generic example on how to do it. This will 
>> automatically convert byte[] back and forth to GenericRecord. That would be 
>> the recommended way when you have multiple transformations before source and 
>> sink.
>> 
>> [1] 
>> https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
>>  
>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java>
>> On Mon, Mar 2, 2020 at 8:44 AM Nitish Pant <nitishpant...@gmail.com 
>> <mailto:nitishpant...@gmail.com>> wrote:
>> Hi all,
>> 
>> I am trying to work with flink to get avro data from kafka for which the 
>> schemas are stored in kafka schema registry. Since, the producer for kafka 
>> is a totally different service(an MQTT consumer sinked to kafka), I can’t 
>> have the schema with me at the consumer end. I read around and diverged to 
>> the following implementation of KeyedDeserializationSchema but I cannot 
>> understand why it’s throwing a `com.esotericsoftware.kryo.KryoException: 
>> java.lang.NullPointerException`
>> 
>> class AvroDeserializationSchema(schemaRegistryUrl: String) extends 
>> KeyedDeserializationSchema[GenericRecord] {
>> 
>>   // Flink needs the serializer to be serializable => this "@transient lazy 
>> val" does the trick
>>   @transient lazy val valueDeserializer = {
>>         val deserializer = new KafkaAvroDeserializer(new 
>> CachedSchemaRegistryClient(schemaRegistryUrl, 
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
>>         deserializer.configure(
>>           Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
>> schemaRegistryUrl,
>>         KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> 
>> false).asJava,
>>           false)
>>         deserializer
>>   }
>> 
>>   override def isEndOfStream(nextElement: GenericRecord): Boolean = false
>> 
>>   override def deserialize(messageKey: Array[Byte], message: Array[Byte],
>>         topic: String, partition: Int, offset: Long): GenericRecord = {
>> 
>>          // val key = keyDeserializer.deserialize(topic, 
>> messageKey).asInstanceOf[String]
>>           val value = valueDeserializer.deserialize(topic, 
>> message).asInstanceOf[GenericRecord]
>> 
>>       value
>>   }
>> 
>>   override def getProducedType: TypeInformation[GenericRecord] =
>>     TypeExtractor.getForClass(classOf[GenericRecord])
>> }
>> 
>> I have no clue how to go about solving this. I saw a lot of people trying to 
>> implement the same. If someone can guide me, it’d be really helpful.
>> 
>> Thanks!
>> Nitish

Reply via email to