It is a compilation error due to type mismatch for value type. Please match key and value types for KafkaIO reader. I.e. if you have KafkaIO.<KeyType, ValueType>read()., 'withValueDeserializer()' needs a class object which extends 'Deserializer<ValueType>'. Since KafkaAvroDeserializer extends 'Deserializer<Object>', so your ValueType needs to be Object, instead of String.
Btw, it might be better to use GenericAvroDeseiralizer or SpecificAvroDeserializer from the same package. On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm <bmvish...@gmail.com> wrote: > > Hi Raghu, > > The deserializer is provided by confluent *io.confluent.kafka.serializers* > package. > > When we set valueDeserializer as KafkaAvroDeserializer. We are getting > below error: > The method withValueDeserializer(Class<? extends Deserializer<String>>) > in the type KafkaIO.Read<String,String> is not applicable for the arguments > (Class<KafkaAvroDeserializer>) > > From the error, it looks like beam does not support this deserializer. > Also we wanted to use schemaRegistry from confluent, is this supported in > Beam ? > > > *Thanks & Regards,* > *Vishwas * > > > On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi <rang...@google.com> wrote: > >> You can set key/value deserializers : >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101 >> What are the errors you see? >> >> Also note that Beam includes AvroCoder for handling Avro records in Beam. >> >> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari <rahulpatwari8...@gmail.com> >> wrote: >> >>> Hi, >>> >>> We have a usecase to read data from Kafka serialized with >>> KafkaAvroSerializer and schema is present in Schema Registry. >>> >>> When we are trying to use ValueDeserializer as >>> io.confluent.kafka.serializers.KafkaAvroDeserializer to get GenericRecord, >>> we are seeing errors. >>> >>> Does KafkaIO.read() supports reading from schema registry and using >>> confluent KafkaAvroSerDe? >>> >>> Regards, >>> Rahul >>> >>