Thanks Ismael for the update. Thanks Alexey for the enhancement. We will test it with 2.20 release.
On Tue, 4 Feb 2020, 10:53 pm Ismaël Mejía, <ieme...@gmail.com> wrote: > Support for Confluent Schema Registry was merged into KafkaIO today. You > can > test it with tomorrow's snapshots (version 2.20.0-SNAPSHOT) or just when > 2.20.0 > gets released. Notice that this was already possible, but Alexey took care > of > making this more user friendly because this is (was) a frequently requested > feature by Kafka/Avro users. > > > > On Fri, Sep 28, 2018 at 6:58 PM Raghu Angadi <rang...@google.com> wrote: > >> Looks like your producer writing a Avro specfic records. >> >> Can you read the records using bundled console consumer? I think it will >> be simpler for you to get it returning valid records and use the same >> deserializer config with your KafkaIO reader. >> >> On Fri, Sep 28, 2018 at 9:33 AM Vishwas Bm <bmvish...@gmail.com> wrote: >> >>> Hi Raghu, >>> >>> Thanks for the response. We are now trying with GenericAvroDeserializer >>> but still seeing issues. >>> We have a producer which sends messages to kafka in format >>> <String,GenericRecord>. >>> >>> Below is the code snippet, we have used at Beam KafkaIo. >>> >>> org.apache.avro.Schema schema = null; >>> try { >>> schema = new org.apache.avro.Schema.Parser().parse(new >>> File("Schema path")); >>> } catch (Exception e) { >>> e.printStackTrace(); >>> } >>> KafkaIO.Read<String, GenericRecord> kafkaIoRead = >>> KafkaIO.<String, GenericRecord>read() >>> >>> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName) >>> .withKeyDeserializer(StringDeserializer.class) >>> >>> .withValueDeserializerAndCoder(GenericAvroDeserializer.class, >>> AvroCoder.of(schema)) >>> >>> .updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl)) >>> .withTimestampPolicyFactory((tp, prevWatermark) -> new >>> KafkaCustomTimestampPolicy(maxDelay, >>> timestampInfo, prevWatermark)); >>> >>> Below is the error seen, >>> >>> Caused by: >>> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: >>> org.apache.avro.AvroRuntimeException: Not a Specific class: interface >>> org.apache.avro.generic.GenericRecord >>> at >>> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234) >>> at >>> avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965) >>> at >>> avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969) >>> at >>> avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829) >>> at >>> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225) >>> ... 8 more >>> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: >>> interface org.apache.avro.generic.GenericRecord >>> at >>> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285) >>> at >>> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594) >>> at >>> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218) >>> at >>> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215) >>> at >>> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568) >>> at >>> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350) >>> at >>> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313) >>> at >>> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228) >>> >>> >>> Can you provide some pointers on this. >>> >>> >>> *Thanks & Regards,* >>> >>> *Vishwas * >>> >>> >>> >>> On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi <rang...@google.com> wrote: >>> >>>> It is a compilation error due to type mismatch for value type. >>>> >>>> Please match key and value types for KafkaIO reader. I.e. if you have >>>> KafkaIO.<KeyType, ValueType>read()., 'withValueDeserializer()' needs a >>>> class object which extends 'Deserializer<ValueType>'. Since >>>> KafkaAvroDeserializer extends 'Deserializer<Object>', so your ValueType >>>> needs to be Object, instead of String. >>>> >>>> Btw, it might be better to use GenericAvroDeseiralizer or >>>> SpecificAvroDeserializer from the same package. >>>> >>>> >>>> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm <bmvish...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Hi Raghu, >>>>> >>>>> The deserializer is provided by confluent >>>>> *io.confluent.kafka.serializers* package. >>>>> >>>>> When we set valueDeserializer as KafkaAvroDeserializer. We are >>>>> getting below error: >>>>> The method withValueDeserializer(Class<? extends >>>>> Deserializer<String>>) in the type KafkaIO.Read<String,String> is not >>>>> applicable for the arguments >>>>> (Class<KafkaAvroDeserializer>) >>>>> >>>>> From the error, it looks like beam does not support this deserializer. >>>>> Also we wanted to use schemaRegistry from confluent, is this supported >>>>> in Beam ? >>>>> >>>>> >>>>> *Thanks & Regards,* >>>>> *Vishwas * >>>>> >>>>> >>>>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi <rang...@google.com> >>>>> wrote: >>>>> >>>>>> You can set key/value deserializers : >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101 >>>>>> What are the errors you see? >>>>>> >>>>>> Also note that Beam includes AvroCoder for handling Avro records in >>>>>> Beam. >>>>>> >>>>>> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari < >>>>>> rahulpatwari8...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> We have a usecase to read data from Kafka serialized with >>>>>>> KafkaAvroSerializer and schema is present in Schema Registry. >>>>>>> >>>>>>> When we are trying to use ValueDeserializer as >>>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer to get >>>>>>> GenericRecord, >>>>>>> we are seeing errors. >>>>>>> >>>>>>> Does KafkaIO.read() supports reading from schema registry and using >>>>>>> confluent KafkaAvroSerDe? >>>>>>> >>>>>>> Regards, >>>>>>> Rahul >>>>>>> >>>>>>