I don't think extending the class is necessary. Not sure I understand why a simple type casting for withDeserializerAndCoder doesn't work? Have you tried this?
p.apply(KafkaIO.<String, Envelope>read() .withValueDeserializerAndCoder ((Deserializer<Envelope>)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <timrobertson...@gmail.com> wrote: > Hi Raghu > > I tried that but with KafkaAvroDeserializer already implementing > Deserializer<Object> I couldn't get it to work... I didn't spend too much > time though and agree something like that would be cleaner. > > Cheers, > Tim > > On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> wrote: > >> Thanks Tim. >> >> How about extending KafkaAvroDeserializer rather >> than AbstractKafkaAvroDeserializer? >> >> TypedKafkaAvroDeserializer class below is useful, but not directly usable >> by the yet. It needs to store the actual type in Kafka consumer config to >> retrieve at run time. >> Even without storing the class, it is still useful. It simplifies user >> code: >> >> public class EnvelopeKafkaAvroDeserializer extends >> TypedKafkaAvroDeserializer<Envelope> {} >> >> This should be part of same package as KafkaAvroDeserializer (surprised >> it is not there yet). >> >> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <timrobertson...@gmail.com >> > wrote: >> >>> Happy to hear >>> >>> I wonder if we could do something like this (totally untested): >>> >>> public class TypedKafkaAvroDeserializer<T> extends >>> AbstractKafkaAvroDeserializer implements Deserializer<T> { >>> @Override >>> public T deserialize(String s, byte[] bytes) { >>> return (T) this.deserialize(bytes); >>> } >>> } >>> >>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >>> andrew+b...@andrew-jones.com> wrote: >>> >>>> Thanks Tim, that works! >>>> >>>> Full code is: >>>> >>>> public class EnvelopeKafkaAvroDeserializer extends >>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> { >>>> @Override >>>> public void configure(Map<String, ?> configs, boolean isKey) { >>>> configure(new KafkaAvroDeserializerConfig(configs)); >>>> } >>>> >>>> @Override >>>> public Envelope deserialize(String s, byte[] bytes) { >>>> return (Envelope) this.deserialize(bytes); >>>> } >>>> >>>> @Override >>>> public void close() {} >>>> } >>>> >>>> Nicer than my solution so think that is the one I'm going to go with >>>> for now. >>>> >>>> Thanks, >>>> Andrew >>>> >>>> >>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>>> >>>> Hi Andrew, >>>> >>>> I also saw the same behaviour. >>>> >>>> It's not pretty but perhaps try this? It was my last idea I ran out of >>>> time to try... >>>> >>>> >>>> *// Basically a copy KafkaAvroDeserializer with the casts in >>>> deserialize**public class *EnvelopeAvroDeserializer *extends >>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { >>>> >>>> ... >>>> >>>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>>> >>>> *return *(Envelope) *this*.deserialize(bytes); >>>> >>>> } >>>> >>>> >>>> >>>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >>>> readerSchema) { >>>> >>>> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >>>> >>>> } >>>> >>>> >>>> >>>> ... >>>> >>>> } >>>> >>>> Tim >>>> >>>> >>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >>>> andrew+b...@andrew-jones.com> wrote: >>>> >>>> >>>> Using Object doesn't work unfortunately. I get an 'Unable to >>>> automatically infer a Coder' error at runtime. >>>> >>>> This is the code: >>>> >>>> p.apply(KafkaIO.<String, Object>read() >>>> .withValueDeserializer(KafkaAvroDeserializer.class) >>>> >>>> It compiles, but at runtime: >>>> >>>> Caused by: java.lang.RuntimeException: Unable to automatically infer a >>>> Coder for the Kafka Deserializer class >>>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder registered >>>> for type class java.lang.Object >>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) >>>> >>>> So far the only thing I've got working is this, where I use the >>>> ByteArrayDeserializer and then parse Avro myself: >>>> >>>> private static KafkaAvroDecoder avroDecoder; >>>> static { >>>> final Properties props = new Properties(); >>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>>> "kafka:9092"); >>>> >>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, " >>>> http://registry:8081"); >>>> >>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); >>>> VerifiableProperties vProps = new VerifiableProperties(props); >>>> avroDecoder = new KafkaAvroDecoder(vProps); >>>> } >>>> >>>> public static void main(String[] args) { >>>> >>>> PipelineOptions options = PipelineOptionsFactory.create(); >>>> Pipeline p = Pipeline.create(options); >>>> >>>> p.apply(KafkaIO.<byte[], byte[]>read() >>>> .withBootstrapServers("kafka:9092") >>>> .withTopic("dbserver1.inventory.customers") >>>> .withKeyDeserializer(ByteArrayDeserializer.class) >>>> .withValueDeserializer(ByteArrayDeserializer.class) >>>> .withoutMetadata( >>>> ) >>>> .apply(Values.<byte[]>create()) >>>> .apply("ParseAvro", ParDo.of(new DoFn<byte[], >>>> Envelope>() { >>>> @ProcessElement >>>> public void processElement(ProcessContext c) { >>>> Envelope data = (Envelope) >>>> avroDecoder.fromBytes(c.element()); >>>> c.output(data); >>>> } >>>> })) >>>> >>>> Thanks, >>>> Andrew >>>> >>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >>>> >>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov < >>>> kirpic...@google.com> wrote: >>>> >>>> It seems that KafkaAvroDeserializer implements Deserializer<Object>, >>>> though I suppose with proper configuration that Object will at run-time be >>>> your desired type. Have you tried adding some Java type casts to make it >>>> compile? >>>> >>>> >>>> +1, cast might be the simplest fix. Alternately you can wrap or >>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object >>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime. >>>> >>>> >>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson < >>>> timrobertson...@gmail.com> wrote: >>>> >>>> I just tried quickly and see the same as you Andrew. >>>> We're missing something obvious or else extending >>>> KafkaAvroDeserializer seems necessary right? >>>> >>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < >>>> andrew+b...@andrew-jones.com> wrote: >>>> >>>> Hi, >>>> >>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think >>>> it should be as simple as: >>>> >>>> p.apply(KafkaIO.<String, Envelope>*read*() >>>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >>>> AvroCoder.of(Envelope.class)) >>>> >>>> Where Envelope is the name of the Avro class. However, that does not >>>> compile and I get the following error: >>>> >>>> incompatible types: >>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> >>>> cannot be converted to java.lang.Class<? extends >>>> >>>> org.apache.kafka.common.serialization.Deserializer<dbserver1.inventory.customers.Envelope>> >>>> >>>> I've tried a number of variations on this theme but haven't yet worked >>>> it out and am starting to run out of ideas... >>>> >>>> Has anyone successfully read Avro data from Kafka? >>>> >>>> The code I'm using can be found at >>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full >>>> environment can be created with Docker. >>>> >>>> Thanks, >>>> Andrew >>>> >>>> >>>> >>>> >>> >> >