Hi Eugene, I understood that was where Andrew started and reported this. I tried and saw the same as him.
incompatible types: java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> cannot be converted to org.apache.kafka.common.serialization.Deserializer<org.gbif.pipelines.io.avro.Envelope> similarly with (Class<? extends Deserializer<Envelope>>) KafkaAvroDeserializer.class On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com> wrote: > I don't think extending the class is necessary. Not sure I understand why > a simple type casting for withDeserializerAndCoder doesn't work? Have you > tried this? > > p.apply(KafkaIO.<String, Envelope>read() > .withValueDeserializerAndCoder((Deserializer<Envelope>) > KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <timrobertson...@gmail.com> > wrote: > >> Hi Raghu >> >> I tried that but with KafkaAvroDeserializer already implementing >> Deserializer<Object> I couldn't get it to work... I didn't spend too >> much time though and agree something like that would be cleaner. >> >> Cheers, >> Tim >> >> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> wrote: >> >>> Thanks Tim. >>> >>> How about extending KafkaAvroDeserializer rather than >>> AbstractKafkaAvroDeserializer? >>> >>> TypedKafkaAvroDeserializer class below is useful, but not directly >>> usable by the yet. It needs to store the actual type in Kafka consumer >>> config to retrieve at run time. >>> Even without storing the class, it is still useful. It simplifies user >>> code: >>> >>> public class EnvelopeKafkaAvroDeserializer extends >>> TypedKafkaAvroDeserializer<Envelope> {} >>> >>> This should be part of same package as KafkaAvroDeserializer (surprised >>> it is not there yet). >>> >>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < >>> timrobertson...@gmail.com> wrote: >>> >>>> Happy to hear >>>> >>>> I wonder if we could do something like this (totally untested): >>>> >>>> public class TypedKafkaAvroDeserializer<T> extends >>>> AbstractKafkaAvroDeserializer implements Deserializer<T> { >>>> @Override >>>> public T deserialize(String s, byte[] bytes) { >>>> return (T) this.deserialize(bytes); >>>> } >>>> } >>>> >>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >>>> andrew+b...@andrew-jones.com> wrote: >>>> >>>>> Thanks Tim, that works! >>>>> >>>>> Full code is: >>>>> >>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> { >>>>> @Override >>>>> public void configure(Map<String, ?> configs, boolean isKey) { >>>>> configure(new KafkaAvroDeserializerConfig(configs)); >>>>> } >>>>> >>>>> @Override >>>>> public Envelope deserialize(String s, byte[] bytes) { >>>>> return (Envelope) this.deserialize(bytes); >>>>> } >>>>> >>>>> @Override >>>>> public void close() {} >>>>> } >>>>> >>>>> Nicer than my solution so think that is the one I'm going to go with >>>>> for now. >>>>> >>>>> Thanks, >>>>> Andrew >>>>> >>>>> >>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>>>> >>>>> Hi Andrew, >>>>> >>>>> I also saw the same behaviour. >>>>> >>>>> It's not pretty but perhaps try this? It was my last idea I ran out of >>>>> time to try... >>>>> >>>>> >>>>> *// Basically a copy KafkaAvroDeserializer with the casts in >>>>> deserialize**public class *EnvelopeAvroDeserializer *extends >>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { >>>>> >>>>> ... >>>>> >>>>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>>>> >>>>> *return *(Envelope) *this*.deserialize(bytes); >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >>>>> readerSchema) { >>>>> >>>>> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> ... >>>>> >>>>> } >>>>> >>>>> Tim >>>>> >>>>> >>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >>>>> andrew+b...@andrew-jones.com> wrote: >>>>> >>>>> >>>>> Using Object doesn't work unfortunately. I get an 'Unable to >>>>> automatically infer a Coder' error at runtime. >>>>> >>>>> This is the code: >>>>> >>>>> p.apply(KafkaIO.<String, Object>read() >>>>> .withValueDeserializer(KafkaAvroDeserializer.class) >>>>> >>>>> It compiles, but at runtime: >>>>> >>>>> Caused by: java.lang.RuntimeException: Unable to automatically infer a >>>>> Coder for the Kafka Deserializer class >>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer: >>>>> no coder registered for type class java.lang.Object >>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) >>>>> >>>>> So far the only thing I've got working is this, where I use the >>>>> ByteArrayDeserializer and then parse Avro myself: >>>>> >>>>> private static KafkaAvroDecoder avroDecoder; >>>>> static { >>>>> final Properties props = new Properties(); >>>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>> "kafka:9092"); >>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>>> "http://registry:8081"); >>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>>>> true); >>>>> VerifiableProperties vProps = new VerifiableProperties(props); >>>>> avroDecoder = new KafkaAvroDecoder(vProps); >>>>> } >>>>> >>>>> public static void main(String[] args) { >>>>> >>>>> PipelineOptions options = PipelineOptionsFactory.create(); >>>>> Pipeline p = Pipeline.create(options); >>>>> >>>>> p.apply(KafkaIO.<byte[], byte[]>read() >>>>> .withBootstrapServers("kafka:9092") >>>>> .withTopic("dbserver1.inventory.customers") >>>>> .withKeyDeserializer(ByteArrayDeserializer.class) >>>>> .withValueDeserializer(ByteArrayDeserializer.class) >>>>> .withoutMetadata( >>>>> ) >>>>> .apply(Values.<byte[]>create()) >>>>> .apply("ParseAvro", ParDo.of(new DoFn<byte[], >>>>> Envelope>() { >>>>> @ProcessElement >>>>> public void processElement(ProcessContext c) { >>>>> Envelope data = (Envelope) >>>>> avroDecoder.fromBytes(c.element()); >>>>> c.output(data); >>>>> } >>>>> })) >>>>> >>>>> Thanks, >>>>> Andrew >>>>> >>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >>>>> >>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov < >>>>> kirpic...@google.com> wrote: >>>>> >>>>> It seems that KafkaAvroDeserializer implements Deserializer<Object>, >>>>> though I suppose with proper configuration that Object will at run-time be >>>>> your desired type. Have you tried adding some Java type casts to make it >>>>> compile? >>>>> >>>>> >>>>> +1, cast might be the simplest fix. Alternately you can wrap or >>>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object >>>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at >>>>> runtime. >>>>> >>>>> >>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson < >>>>> timrobertson...@gmail.com> wrote: >>>>> >>>>> I just tried quickly and see the same as you Andrew. >>>>> We're missing something obvious or else extending KafkaAvroDeserialize >>>>> r seems necessary right? >>>>> >>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < >>>>> andrew+b...@andrew-jones.com> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think >>>>> it should be as simple as: >>>>> >>>>> p.apply(KafkaIO.<String, Envelope>*read*() >>>>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >>>>> AvroCoder.of(Envelope.class)) >>>>> >>>>> Where Envelope is the name of the Avro class. However, that does not >>>>> compile and I get the following error: >>>>> >>>>> incompatible types: >>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> >>>>> cannot be converted to java.lang.Class<? extends >>>>> org.apache.kafka.common.serialization.Deserializer< >>>>> dbserver1.inventory.customers.Envelope>> >>>>> >>>>> I've tried a number of variations on this theme but haven't yet worked >>>>> it out and am starting to run out of ideas... >>>>> >>>>> Has anyone successfully read Avro data from Kafka? >>>>> >>>>> The code I'm using can be found at >>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full >>>>> environment can be created with Docker. >>>>> >>>>> Thanks, >>>>> Andrew >>>>> >>>>> >>>>> >>>>> >>>> >>> >>