Looks like your producer writing a Avro specfic records. Can you read the records using bundled console consumer? I think it will be simpler for you to get it returning valid records and use the same deserializer config with your KafkaIO reader.
On Fri, Sep 28, 2018 at 9:33 AM Vishwas Bm <[email protected]> wrote: > Hi Raghu, > > Thanks for the response. We are now trying with GenericAvroDeserializer > but still seeing issues. > We have a producer which sends messages to kafka in format > <String,GenericRecord>. > > Below is the code snippet, we have used at Beam KafkaIo. > > org.apache.avro.Schema schema = null; > try { > schema = new org.apache.avro.Schema.Parser().parse(new > File("Schema path")); > } catch (Exception e) { > e.printStackTrace(); > } > KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, > GenericRecord>read() > > .withBootstrapServers(bootstrapServerUrl).withTopic(topicName) > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializerAndCoder(GenericAvroDeserializer.class, > AvroCoder.of(schema)) > > .updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl)) > .withTimestampPolicyFactory((tp, prevWatermark) -> new > KafkaCustomTimestampPolicy(maxDelay, > timestampInfo, prevWatermark)); > > Below is the error seen, > > Caused by: > avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.avro.AvroRuntimeException: Not a Specific class: interface > org.apache.avro.generic.GenericRecord > at > avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) > at > avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) > at > avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) > at > avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) > at > org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) > ... 8 more > Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: > interface org.apache.avro.generic.GenericRecord > at > org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) > at > org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594) > at > org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) > at > org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) > at > avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) > at > avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) > at > avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) > at > avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) > > > Can you provide some pointers on this. > > > *Thanks & Regards,* > > *Vishwas * > > > > On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi <[email protected]> wrote: > >> It is a compilation error due to type mismatch for value type. >> >> Please match key and value types for KafkaIO reader. I.e. if you have >> KafkaIO.<KeyType, ValueType>read()., 'withValueDeserializer()' needs a >> class object which extends 'Deserializer<ValueType>'. Since >> KafkaAvroDeserializer extends 'Deserializer<Object>', so your ValueType >> needs to be Object, instead of String. >> >> Btw, it might be better to use GenericAvroDeseiralizer or >> SpecificAvroDeserializer from the same package. >> >> >> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm <[email protected]> wrote: >> >>> >>> Hi Raghu, >>> >>> The deserializer is provided by confluent >>> *io.confluent.kafka.serializers* package. >>> >>> When we set valueDeserializer as KafkaAvroDeserializer. We are getting >>> below error: >>> The method withValueDeserializer(Class<? extends >>> Deserializer<String>>) in the type KafkaIO.Read<String,String> is not >>> applicable for the arguments >>> (Class<KafkaAvroDeserializer>) >>> >>> From the error, it looks like beam does not support this deserializer. >>> Also we wanted to use schemaRegistry from confluent, is this supported >>> in Beam ? >>> >>> >>> *Thanks & Regards,* >>> *Vishwas * >>> >>> >>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi <[email protected]> >>> wrote: >>> >>>> You can set key/value deserializers : >>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101 >>>> What are the errors you see? >>>> >>>> Also note that Beam includes AvroCoder for handling Avro records in >>>> Beam. >>>> >>>> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari < >>>> [email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> We have a usecase to read data from Kafka serialized with >>>>> KafkaAvroSerializer and schema is present in Schema Registry. >>>>> >>>>> When we are trying to use ValueDeserializer as >>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer to get GenericRecord, >>>>> we are seeing errors. >>>>> >>>>> Does KafkaIO.read() supports reading from schema registry and using >>>>> confluent KafkaAvroSerDe? >>>>> >>>>> Regards, >>>>> Rahul >>>>> >>>>
