Hi all,

I am trying to work with flink to get avro data from kafka for which the 
schemas are stored in kafka schema registry. Since, the producer for kafka is a 
totally different service(an MQTT consumer sinked to kafka), I can’t have the 
schema with me at the consumer end. I read around and diverged to the following 
implementation of KeyedDeserializationSchema but I cannot understand why it’s 
throwing a `com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerException`

class AvroDeserializationSchema(schemaRegistryUrl: String) extends 
KeyedDeserializationSchema[GenericRecord] {

  // Flink needs the serializer to be serializable => this "@transient lazy 
val" does the trick
  @transient lazy val valueDeserializer = {
        val deserializer = new KafkaAvroDeserializer(new 
CachedSchemaRegistryClient(schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT))
        deserializer.configure(
          Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
schemaRegistryUrl,
        KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> 
false).asJava,
          false)
        deserializer
  }

  override def isEndOfStream(nextElement: GenericRecord): Boolean = false

  override def deserialize(messageKey: Array[Byte], message: Array[Byte],
        topic: String, partition: Int, offset: Long): GenericRecord = {

         // val key = keyDeserializer.deserialize(topic, 
messageKey).asInstanceOf[String]
          val value = valueDeserializer.deserialize(topic, 
message).asInstanceOf[GenericRecord]

      value
  }

  override def getProducedType: TypeInformation[GenericRecord] =
    TypeExtractor.getForClass(classOf[GenericRecord])
}

I have no clue how to go about solving this. I saw a lot of people trying to 
implement the same. If someone can guide me, it’d be really helpful.

Thanks!
Nitish

Reply via email to