Re: KafkaIO and Avro
Thanks Eugene, that worked perfectly! Full final code at https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java. Thanks, Andrew On Fri, Oct 20, 2017, at 05:10 PM, Eugene Kirpichov wrote: > This is due to Java doing type erasure in any expression that involves > a raw type. This will compile if you extract the result of > .apply(KafkaIO.read()...) into a local variable.> > On Fri, Oct 20, 2017, 1:51 AM Andrew Jones jones.com[1]> wrote:>> __ >> Thanks Eugene. That does compile, although the rest of the pipeline >> doesn't seem happy.>> >> The next line is: >> >> .apply(Values.create()) >> >> But that now doesn't compile with the following error: >> >> /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExampl- >> e.java:[54,17] cannot find symbol>> symbol: method >> apply(org.apache.beam.sdk.transforms.Values> ver1.inventory.customers.Envelope>)>> location: interface >> org.apache.beam.sdk.values.POutput >> >> Don't really understand what's wrong here. It works fine when using >> the EnvelopeKafkaAvroDeserializer as suggested by Tim.>> >> Thanks, >> Andrew >> >> >> On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote: >>> Thanks Eugene >>> >>> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi >>> wrote: Ah, nice. It works. On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov wrote:> The following compiles fine: > > > p.apply(KafkaIO.read() > > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializerAndCoder((Class)KafkaAvroDe- > serializer.class, AvroCoder.of(Envelope.class))> > > On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi > wrote:>> Same for me. It does not look like there is an annotation to >> suppress the error.>> >> >> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson >> wrote:>>> Hi Eugene, >>> >>> I understood that was where Andrew started and reported this. I >>> tried and saw the same as him.>>> >>> incompatible types: java.lang.Class>> ers.KafkaAvroDeserializer> cannot be converted to org.apache.ka- >>> fka.common.serialization.Deserializer>> o.Envelope >>> similarly with >>> (Class>) >>> KafkaAvroDeserializer.*class*>>> >>> >>> >>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov >>> wrote: I don't think extending the class >>> is necessary. Not sure I understand why a simple type casting for withDeserializerAndCoder doesn't work? Have you tried this? p.apply(KafkaIO.read() .withValueDeserializerAndCoder((Deserializer)Kafka- AvroDeserializer.class, AvroCoder.of(Envelope.class)) On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson wrote:> Hi Raghu > > I tried that but with KafkaAvroDeserializer already > implementing Deserializer I couldn't get it to work... > I didn't spend too much time though and agree something like > that would be cleaner.> > Cheers, > Tim > > On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi > wrote:>> Thanks Tim. >> >> How about extending KafkaAvroDeserializer rather than >> AbstractKafkaAvroDeserializer?>> >> TypedKafkaAvroDeserializer class below is useful, but not >> directly usable by the yet. It needs to store the actual type >> in Kafka consumer config to retrieve at run time.>> Even >> without storing the class, it is still useful. It >> simplifies user code:>> >> public class EnvelopeKafkaAvroDeserializer extends >> TypedKafkaAvroDeserializer {}>> >> This should be part of same package as KafkaAvroDeserializer >> (surprised it is not there yet).>> >> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson >> wrote:>>> Happy to hear >>> >>> I wonder if we could do something like this (totally >>> untested):>>> >>> public class TypedKafkaAvroDeserializer extends >>> AbstractKafkaAvroDeserializer implements Deserializer >>> {>>>@Override >>> >>> public T deserialize(String s, byte[] bytes) { >>> return (T) this.deserialize(bytes); >>> } >>> >>> } >>> >>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones >> jones.com> wrote: __ Thanks Tim, that works! Full code is: public class Envelope
Re: KafkaIO and Avro
This is due to Java doing type erasure in any expression that involves a raw type. This will compile if you extract the result of .apply(KafkaIO.read()...) into a local variable. On Fri, Oct 20, 2017, 1:51 AM Andrew Jones wrote: > Thanks Eugene. That does compile, although the rest of the pipeline > doesn't seem happy. > > The next line is: > > .apply(Values.create()) > > But that now doesn't compile with the following error: > > /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java:[54,17] > cannot find symbol > symbol: method > apply(org.apache.beam.sdk.transforms.Values) > location: interface org.apache.beam.sdk.values.POutput > > Don't really understand what's wrong here. It works fine when using > the EnvelopeKafkaAvroDeserializer as suggested by Tim. > > Thanks, > Andrew > > > On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote: > > Thanks Eugene > > On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi wrote: > > Ah, nice. It works. > > On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov > wrote: > > The following compiles fine: > > > p.apply(KafkaIO.read() > > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > > .withKeyDeserializer(StringDeserializer.class) > > .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > > On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi wrote: > > Same for me. It does not look like there is an annotation to suppress the > error. > > > On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson > wrote: > > Hi Eugene, > > I understood that was where Andrew started and reported this. I tried and > saw the same as him. > > incompatible types: > java.lang.Class > cannot be converted to > org.apache.kafka.common.serialization.Deserializer > > similarly with > (Class>) KafkaAvroDeserializer.*class* > > > > On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov > wrote: > > I don't think extending the class is necessary. Not sure I understand why > a simple type casting for withDeserializerAndCoder doesn't work? Have you > tried this? > > p.apply(KafkaIO.read() > .withValueDeserializerAndCoder > ((Deserializer)KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson > wrote: > > Hi Raghu > > I tried that but with KafkaAvroDeserializer already implementing > Deserializer I couldn't get it to work... I didn't spend too much > time though and agree something like that would be cleaner. > > Cheers, > Tim > > On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi wrote: > > Thanks Tim. > > How about extending KafkaAvroDeserializer rather > than AbstractKafkaAvroDeserializer? > > TypedKafkaAvroDeserializer class below is useful, but not directly usable > by the yet. It needs to store the actual type in Kafka consumer config to > retrieve at run time. > Even without storing the class, it is still useful. It simplifies user > code: > > public class EnvelopeKafkaAvroDeserializer extends > TypedKafkaAvroDeserializer {} > > This should be part of same package as KafkaAvroDeserializer (surprised it > is not there yet). > > On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson > wrote: > > Happy to hear > > I wonder if we could do something like this (totally untested): > > public class TypedKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer { >@Override > > public T deserialize(String s, byte[] bytes) { > return (T) this.deserialize(bytes); > } > > } > > On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > > Thanks Tim, that works! > > Full code is: > > public class EnvelopeKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer { > @Override > public void configure(Map configs, boolean isKey) { > configure(new KafkaAvroDeserializerConfig(configs)); > } > > @Override > > public Envelope deserialize(String s, byte[] bytes) { > return (Envelope) this.deserialize(bytes); > } > > > @Override > public void close() {} > } > > Nicer than my solution so think that is the one I'm going to go with for > now. > > Thanks, > Andrew > > > On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: > > Hi Andrew, > > I also saw the same behaviour. > > It's not pretty but perhaps try this? It was my last idea I ran out of > time to try... > > > *// Basically a copy KafkaAvroDeserializer with the casts in > deserialize**public class *EnvelopeAvroDeserializer *extends > *AbstractKafkaAvroDeserializer *implements *Deserializer { > > > > ... > > > > *public *Envelope deserialize(String s, *byte*[] bytes) { > > > > *return *(Envelope) *this*.deserialize(bytes); > > > > } > > > > > > > > *public *Envelope deserialize(String s, *byte*[] bytes, Schema > readerSchema) { > > > > *return *(Envelope) *this*.deserialize(bytes, readerSchema); >
Re: KafkaIO and Avro
Thanks Eugene. That does compile, although the rest of the pipeline doesn't seem happy. The next line is: .apply(Values.create()) But that now doesn't compile with the following error: /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.j- ava:[54,17] cannot find symbol symbol: method apply(org.apache.beam.sdk.transforms.Values) location: interface org.apache.beam.sdk.values.POutput Don't really understand what's wrong here. It works fine when using the EnvelopeKafkaAvroDeserializer as suggested by Tim. Thanks, Andrew On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote: > Thanks Eugene > > On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi > wrote:>> Ah, nice. It works. >> >> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov >> wrote:>>> The following compiles fine: >>> >>> >>> p.apply(KafkaIO.read() >>> >>> .withBootstrapServers("kafka:9092") >>> .withTopic("dbserver1.inventory.customers") >>> >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializerAndCoder((Class)KafkaAvroDese- >>> rializer.class, AvroCoder.of(Envelope.class))>>> >>> >>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi >>> wrote: Same for me. It does not look like there is an annotation to suppress the error. On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson wrote:> Hi Eugene, > > I understood that was where Andrew started and reported this. I > tried and saw the same as him.> > incompatible types: java.lang.Class s.KafkaAvroDeserializer> cannot be converted to org.apache.kafka.- > common.serialization.Deserializer lope>> > similarly with > (Class>) > KafkaAvroDeserializer.*class*> > > > On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov > wrote:>> I don't think extending the class is > necessary. Not sure I >> understand why a simple type casting for withDeserializerAndCoder >> doesn't work? Have you tried this?>> >> p.apply(KafkaIO.read() >> .withValueDeserializerAndCoder((Deserializer)KafkaAv- >> roDeserializer.class,>> AvroCoder.of(Envelope.class)) >> >> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson >> wrote:>>> Hi Raghu >>> >>> I tried that but with KafkaAvroDeserializer already implementing >>> Deserializer I couldn't get it to work... I didn't spend >>> too much time though and agree something like that would be >>> cleaner.>>> >>> Cheers, >>> Tim >>> >>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi >>> wrote: Thanks Tim. How about extending KafkaAvroDeserializer rather than AbstractKafkaAvroDeserializer? TypedKafkaAvroDeserializer class below is useful, but not directly usable by the yet. It needs to store the actual type in Kafka consumer config to retrieve at run time. Even without storing the class, it is still useful. It simplifies user code: public class EnvelopeKafkaAvroDeserializer extends TypedKafkaAvroDeserializer {} This should be part of same package as KafkaAvroDeserializer (surprised it is not there yet). On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson wrote:> Happy to hear > > I wonder if we could do something like this (totally > untested):> > public class TypedKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer {> > @Override > > public T deserialize(String s, byte[] bytes) { > return (T) this.deserialize(bytes); > } > > } > > On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones jones.com> wrote:>> __ >> Thanks Tim, that works! >> >> Full code is: >> >> public class EnvelopeKafkaAvroDeserializer extends >> AbstractKafkaAvroDeserializer implements >> Deserializer {>> @Override >> public void configure(Map configs, boolean >> isKey) {>> configure(new >> KafkaAvroDeserializerConfig(configs));>> } >> >> @Override >> >> public Envelope deserialize(String s, byte[] bytes) { >> return (Envelope) this.deserialize(bytes); >> } >> >> >> @Override >> public void close() {} >> } >> >> Nicer than my solution so think that is the one I'm going to >> go with for now.>> >> Thanks, >> Andrew >> >> >> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>> H
Re: KafkaIO and Avro
Thanks Eugene On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi wrote: > Ah, nice. It works. > > On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov > wrote: > >> The following compiles fine: >> >> >> p.apply(KafkaIO.read() >> .withBootstrapServers("kafka:9092") >> .withTopic("dbserver1.inventory.customers") >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializerAndCoder >> ((Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) >> >> >> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi wrote: >> >>> Same for me. It does not look like there is an annotation to suppress >>> the error. >>> >>> >>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson < >>> timrobertson...@gmail.com> wrote: >>> Hi Eugene, I understood that was where Andrew started and reported this. I tried and saw the same as him. incompatible types: java.lang.Class>>> afka.serializers.KafkaAvroDeserializer> cannot be converted to org.apache.kafka.common.serialization.Deserializer>>> pipelines.io.avro.Envelope> similarly with (Class>) KafkaAvroDeserializer.class On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov >>> > wrote: > I don't think extending the class is necessary. Not sure I understand > why a simple type casting for withDeserializerAndCoder doesn't work? Have > you tried this? > > p.apply(KafkaIO.read() > .withValueDeserializerAndCoder((Deserializer)Kafka > AvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson < > timrobertson...@gmail.com> wrote: > >> Hi Raghu >> >> I tried that but with KafkaAvroDeserializer already implementing >> Deserializer I couldn't get it to work... I didn't spend too >> much time though and agree something like that would be cleaner. >> >> Cheers, >> Tim >> >> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi >> wrote: >> >>> Thanks Tim. >>> >>> How about extending KafkaAvroDeserializer rather >>> than AbstractKafkaAvroDeserializer? >>> >>> TypedKafkaAvroDeserializer class below is useful, but not directly >>> usable by the yet. It needs to store the actual type in Kafka consumer >>> config to retrieve at run time. >>> Even without storing the class, it is still useful. It simplifies >>> user code: >>> >>> public class EnvelopeKafkaAvroDeserializer extends >>> TypedKafkaAvroDeserializer {} >>> >>> This should be part of same package as KafkaAvroDeserializer >>> (surprised it is not there yet). >>> >>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < >>> timrobertson...@gmail.com> wrote: >>> Happy to hear I wonder if we could do something like this (totally untested): public class TypedKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer { @Override public T deserialize(String s, byte[] bytes) { return (T) this.deserialize(bytes); } } On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < andrew+b...@andrew-jones.com> wrote: > Thanks Tim, that works! > > Full code is: > > public class EnvelopeKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer { > @Override > public void configure(Map configs, boolean isKey) { > configure(new KafkaAvroDeserializerConfig(configs)); > } > > @Override > public Envelope deserialize(String s, byte[] bytes) { > return (Envelope) this.deserialize(bytes); > } > > @Override > public void close() {} > } > > Nicer than my solution so think that is the one I'm going to go > with for now. > > Thanks, > Andrew > > > On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: > > Hi Andrew, > > I also saw the same behaviour. > > It's not pretty but perhaps try this? It was my last idea I ran > out of time to try... > > > *// Basically a copy KafkaAvroDeserializer with the casts in > deserialize**public class *EnvelopeAvroDeserializer *extends > *AbstractKafkaAvroDeserializer *implements *Deserializer { > > ... > > *public *Envelope deserialize(String s, *byte*[] bytes) { > > *return *(Envelope) *this*.deserialize(bytes); > > } > > > > *public *Envelope deserialize(String s, *byte*[] bytes, Schema > readerSchema) {
Re: KafkaIO and Avro
Ah, nice. It works. On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov wrote: > The following compiles fine: > > > p.apply(KafkaIO.read() > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializerAndCoder( > (Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) > > > On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi wrote: > >> Same for me. It does not look like there is an annotation to suppress the >> error. >> >> >> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson < >> timrobertson...@gmail.com> wrote: >> >>> Hi Eugene, >>> >>> I understood that was where Andrew started and reported this. I tried >>> and saw the same as him. >>> >>> incompatible types: >>> java.lang.Class >>> cannot be converted to org.apache.kafka.common. >>> serialization.Deserializer >>> >>> similarly with >>> (Class>) KafkaAvroDeserializer.class >>> >>> >>> >>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov >>> wrote: >>> I don't think extending the class is necessary. Not sure I understand why a simple type casting for withDeserializerAndCoder doesn't work? Have you tried this? p.apply(KafkaIO.read() .withValueDeserializerAndCoder((Deserializer) KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson < timrobertson...@gmail.com> wrote: > Hi Raghu > > I tried that but with KafkaAvroDeserializer already implementing > Deserializer I couldn't get it to work... I didn't spend too > much time though and agree something like that would be cleaner. > > Cheers, > Tim > > On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi > wrote: > >> Thanks Tim. >> >> How about extending KafkaAvroDeserializer rather than >> AbstractKafkaAvroDeserializer? >> >> TypedKafkaAvroDeserializer class below is useful, but not directly >> usable by the yet. It needs to store the actual type in Kafka consumer >> config to retrieve at run time. >> Even without storing the class, it is still useful. It simplifies >> user code: >> >> public class EnvelopeKafkaAvroDeserializer extends >> TypedKafkaAvroDeserializer {} >> >> This should be part of same package as KafkaAvroDeserializer >> (surprised it is not there yet). >> >> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < >> timrobertson...@gmail.com> wrote: >> >>> Happy to hear >>> >>> I wonder if we could do something like this (totally untested): >>> >>> public class TypedKafkaAvroDeserializer extends >>> AbstractKafkaAvroDeserializer implements Deserializer { >>>@Override >>> public T deserialize(String s, byte[] bytes) { >>> return (T) this.deserialize(bytes); >>> } >>> } >>> >>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >>> andrew+b...@andrew-jones.com> wrote: >>> Thanks Tim, that works! Full code is: public class EnvelopeKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer { @Override public void configure(Map configs, boolean isKey) { configure(new KafkaAvroDeserializerConfig(configs)); } @Override public Envelope deserialize(String s, byte[] bytes) { return (Envelope) this.deserialize(bytes); } @Override public void close() {} } Nicer than my solution so think that is the one I'm going to go with for now. Thanks, Andrew On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: Hi Andrew, I also saw the same behaviour. It's not pretty but perhaps try this? It was my last idea I ran out of time to try... *// Basically a copy KafkaAvroDeserializer with the casts in deserialize**public class *EnvelopeAvroDeserializer *extends *AbstractKafkaAvroDeserializer *implements *Deserializer { ... *public *Envelope deserialize(String s, *byte*[] bytes) { *return *(Envelope) *this*.deserialize(bytes); } *public *Envelope deserialize(String s, *byte*[] bytes, Schema readerSchema) { *return *(Envelope) *this*.deserialize(bytes, readerSchema); } ... } Tim On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < andrew+b...@and
Re: KafkaIO and Avro
The following compiles fine: p.apply(KafkaIO.read() .withBootstrapServers("kafka:9092") .withTopic("dbserver1.inventory.customers") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi wrote: > Same for me. It does not look like there is an annotation to suppress the > error. > > > On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson > wrote: > >> Hi Eugene, >> >> I understood that was where Andrew started and reported this. I tried >> and saw the same as him. >> >> incompatible types: >> java.lang.Class >> cannot be converted to >> org.apache.kafka.common.serialization.Deserializer >> >> similarly with >> (Class>) KafkaAvroDeserializer.class >> >> >> >> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov >> wrote: >> >>> I don't think extending the class is necessary. Not sure I understand >>> why a simple type casting for withDeserializerAndCoder doesn't work? Have >>> you tried this? >>> >>> p.apply(KafkaIO.read() >>> .withValueDeserializerAndCoder >>> ((Deserializer)KafkaAvroDeserializer.class, >>> AvroCoder.of(Envelope.class)) >>> >>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson < >>> timrobertson...@gmail.com> wrote: >>> Hi Raghu I tried that but with KafkaAvroDeserializer already implementing Deserializer I couldn't get it to work... I didn't spend too much time though and agree something like that would be cleaner. Cheers, Tim On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi wrote: > Thanks Tim. > > How about extending KafkaAvroDeserializer rather > than AbstractKafkaAvroDeserializer? > > TypedKafkaAvroDeserializer class below is useful, but not directly > usable by the yet. It needs to store the actual type in Kafka consumer > config to retrieve at run time. > Even without storing the class, it is still useful. It simplifies user > code: > > public class EnvelopeKafkaAvroDeserializer extends > TypedKafkaAvroDeserializer {} > > This should be part of same package as KafkaAvroDeserializer > (surprised it is not there yet). > > On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < > timrobertson...@gmail.com> wrote: > >> Happy to hear >> >> I wonder if we could do something like this (totally untested): >> >> public class TypedKafkaAvroDeserializer extends >> AbstractKafkaAvroDeserializer implements Deserializer { >>@Override >> public T deserialize(String s, byte[] bytes) { >> return (T) this.deserialize(bytes); >> } >> } >> >> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >> andrew+b...@andrew-jones.com> wrote: >> >>> Thanks Tim, that works! >>> >>> Full code is: >>> >>> public class EnvelopeKafkaAvroDeserializer extends >>> AbstractKafkaAvroDeserializer implements Deserializer { >>> @Override >>> public void configure(Map configs, boolean isKey) { >>> configure(new KafkaAvroDeserializerConfig(configs)); >>> } >>> >>> @Override >>> public Envelope deserialize(String s, byte[] bytes) { >>> return (Envelope) this.deserialize(bytes); >>> } >>> >>> @Override >>> public void close() {} >>> } >>> >>> Nicer than my solution so think that is the one I'm going to go with >>> for now. >>> >>> Thanks, >>> Andrew >>> >>> >>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>> >>> Hi Andrew, >>> >>> I also saw the same behaviour. >>> >>> It's not pretty but perhaps try this? It was my last idea I ran out >>> of time to try... >>> >>> >>> *// Basically a copy KafkaAvroDeserializer with the casts in >>> deserialize**public class *EnvelopeAvroDeserializer *extends >>> *AbstractKafkaAvroDeserializer *implements *Deserializer { >>> >>> ... >>> >>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>> >>> *return *(Envelope) *this*.deserialize(bytes); >>> >>> } >>> >>> >>> >>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >>> readerSchema) { >>> >>> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >>> >>> } >>> >>> >>> >>> ... >>> >>> } >>> >>> Tim >>> >>> >>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >>> andrew+b...@andrew-jones.com> wrote: >>> >>> >>> Using Object doesn't work unfortunately. I get an 'Unable to >>> automatically infer a Coder' error at runtime. >>> >>> This is the code: >>> >>> p.apply(KafkaIO.read() >>> .withValueDeserializer(KafkaAvroDeseriali
Re: KafkaIO and Avro
Same for me. It does not look like there is an annotation to suppress the error. On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson wrote: > Hi Eugene, > > I understood that was where Andrew started and reported this. I tried and > saw the same as him. > > incompatible types: > java.lang.Class > cannot be converted to org.apache.kafka.common.serialization.Deserializer< > org.gbif.pipelines.io.avro.Envelope> > > similarly with > (Class>) KafkaAvroDeserializer.class > > > > On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov > wrote: > >> I don't think extending the class is necessary. Not sure I understand why >> a simple type casting for withDeserializerAndCoder doesn't work? Have you >> tried this? >> >> p.apply(KafkaIO.read() >> .withValueDeserializerAndCoder((Deserializer)Kafka >> AvroDeserializer.class, >> AvroCoder.of(Envelope.class)) >> >> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson >> wrote: >> >>> Hi Raghu >>> >>> I tried that but with KafkaAvroDeserializer already implementing >>> Deserializer I couldn't get it to work... I didn't spend too >>> much time though and agree something like that would be cleaner. >>> >>> Cheers, >>> Tim >>> >>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi >>> wrote: >>> Thanks Tim. How about extending KafkaAvroDeserializer rather than AbstractKafkaAvroDeserializer? TypedKafkaAvroDeserializer class below is useful, but not directly usable by the yet. It needs to store the actual type in Kafka consumer config to retrieve at run time. Even without storing the class, it is still useful. It simplifies user code: public class EnvelopeKafkaAvroDeserializer extends TypedKafkaAvroDeserializer {} This should be part of same package as KafkaAvroDeserializer (surprised it is not there yet). On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < timrobertson...@gmail.com> wrote: > Happy to hear > > I wonder if we could do something like this (totally untested): > > public class TypedKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer { >@Override > public T deserialize(String s, byte[] bytes) { > return (T) this.deserialize(bytes); > } > } > > On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > >> Thanks Tim, that works! >> >> Full code is: >> >> public class EnvelopeKafkaAvroDeserializer extends >> AbstractKafkaAvroDeserializer implements Deserializer { >> @Override >> public void configure(Map configs, boolean isKey) { >> configure(new KafkaAvroDeserializerConfig(configs)); >> } >> >> @Override >> public Envelope deserialize(String s, byte[] bytes) { >> return (Envelope) this.deserialize(bytes); >> } >> >> @Override >> public void close() {} >> } >> >> Nicer than my solution so think that is the one I'm going to go with >> for now. >> >> Thanks, >> Andrew >> >> >> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >> >> Hi Andrew, >> >> I also saw the same behaviour. >> >> It's not pretty but perhaps try this? It was my last idea I ran out >> of time to try... >> >> >> *// Basically a copy KafkaAvroDeserializer with the casts in >> deserialize**public class *EnvelopeAvroDeserializer *extends >> *AbstractKafkaAvroDeserializer *implements *Deserializer { >> >> ... >> >> *public *Envelope deserialize(String s, *byte*[] bytes) { >> >> *return *(Envelope) *this*.deserialize(bytes); >> >> } >> >> >> >> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >> readerSchema) { >> >> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >> >> } >> >> >> >> ... >> >> } >> >> Tim >> >> >> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >> andrew+b...@andrew-jones.com> wrote: >> >> >> Using Object doesn't work unfortunately. I get an 'Unable to >> automatically infer a Coder' error at runtime. >> >> This is the code: >> >> p.apply(KafkaIO.read() >> .withValueDeserializer(KafkaAvroDeserializer.class) >> >> It compiles, but at runtime: >> >> Caused by: java.lang.RuntimeException: Unable to automatically infer >> a Coder for the Kafka Deserializer class >> io.confluent.kafka.serializers.KafkaAvroDeserializer: >> no coder registered for type class java.lang.Object >> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) >> >> So far the only thing I've got working is this, where I use the >> ByteArrayDeserializer and then parse Avro myself: >> >> private stat
Re: KafkaIO and Avro
Hi Eugene, I understood that was where Andrew started and reported this. I tried and saw the same as him. incompatible types: java.lang.Class cannot be converted to org.apache.kafka.common.serialization.Deserializer similarly with (Class>) KafkaAvroDeserializer.class On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov wrote: > I don't think extending the class is necessary. Not sure I understand why > a simple type casting for withDeserializerAndCoder doesn't work? Have you > tried this? > > p.apply(KafkaIO.read() > .withValueDeserializerAndCoder((Deserializer) > KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson > wrote: > >> Hi Raghu >> >> I tried that but with KafkaAvroDeserializer already implementing >> Deserializer I couldn't get it to work... I didn't spend too >> much time though and agree something like that would be cleaner. >> >> Cheers, >> Tim >> >> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi wrote: >> >>> Thanks Tim. >>> >>> How about extending KafkaAvroDeserializer rather than >>> AbstractKafkaAvroDeserializer? >>> >>> TypedKafkaAvroDeserializer class below is useful, but not directly >>> usable by the yet. It needs to store the actual type in Kafka consumer >>> config to retrieve at run time. >>> Even without storing the class, it is still useful. It simplifies user >>> code: >>> >>> public class EnvelopeKafkaAvroDeserializer extends >>> TypedKafkaAvroDeserializer {} >>> >>> This should be part of same package as KafkaAvroDeserializer (surprised >>> it is not there yet). >>> >>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < >>> timrobertson...@gmail.com> wrote: >>> Happy to hear I wonder if we could do something like this (totally untested): public class TypedKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer { @Override public T deserialize(String s, byte[] bytes) { return (T) this.deserialize(bytes); } } On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < andrew+b...@andrew-jones.com> wrote: > Thanks Tim, that works! > > Full code is: > > public class EnvelopeKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer { > @Override > public void configure(Map configs, boolean isKey) { > configure(new KafkaAvroDeserializerConfig(configs)); > } > > @Override > public Envelope deserialize(String s, byte[] bytes) { > return (Envelope) this.deserialize(bytes); > } > > @Override > public void close() {} > } > > Nicer than my solution so think that is the one I'm going to go with > for now. > > Thanks, > Andrew > > > On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: > > Hi Andrew, > > I also saw the same behaviour. > > It's not pretty but perhaps try this? It was my last idea I ran out of > time to try... > > > *// Basically a copy KafkaAvroDeserializer with the casts in > deserialize**public class *EnvelopeAvroDeserializer *extends > *AbstractKafkaAvroDeserializer *implements *Deserializer { > > ... > > *public *Envelope deserialize(String s, *byte*[] bytes) { > > *return *(Envelope) *this*.deserialize(bytes); > > } > > > > *public *Envelope deserialize(String s, *byte*[] bytes, Schema > readerSchema) { > > *return *(Envelope) *this*.deserialize(bytes, readerSchema); > > } > > > > ... > > } > > Tim > > > On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > > Using Object doesn't work unfortunately. I get an 'Unable to > automatically infer a Coder' error at runtime. > > This is the code: > > p.apply(KafkaIO.read() > .withValueDeserializer(KafkaAvroDeserializer.class) > > It compiles, but at runtime: > > Caused by: java.lang.RuntimeException: Unable to automatically infer a > Coder for the Kafka Deserializer class > io.confluent.kafka.serializers.KafkaAvroDeserializer: > no coder registered for type class java.lang.Object > at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) > > So far the only thing I've got working is this, where I use the > ByteArrayDeserializer and then parse Avro myself: > > private static KafkaAvroDecoder avroDecoder; > static { > final Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "kafka:9092"); > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://registry:8081";); > props.put(KafkaAvroDeserializerConfig.
Re: KafkaIO and Avro
I don't think extending the class is necessary. Not sure I understand why a simple type casting for withDeserializerAndCoder doesn't work? Have you tried this? p.apply(KafkaIO.read() .withValueDeserializerAndCoder ((Deserializer)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson wrote: > Hi Raghu > > I tried that but with KafkaAvroDeserializer already implementing > Deserializer I couldn't get it to work... I didn't spend too much > time though and agree something like that would be cleaner. > > Cheers, > Tim > > On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi wrote: > >> Thanks Tim. >> >> How about extending KafkaAvroDeserializer rather >> than AbstractKafkaAvroDeserializer? >> >> TypedKafkaAvroDeserializer class below is useful, but not directly usable >> by the yet. It needs to store the actual type in Kafka consumer config to >> retrieve at run time. >> Even without storing the class, it is still useful. It simplifies user >> code: >> >> public class EnvelopeKafkaAvroDeserializer extends >> TypedKafkaAvroDeserializer {} >> >> This should be part of same package as KafkaAvroDeserializer (surprised >> it is not there yet). >> >> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson > > wrote: >> >>> Happy to hear >>> >>> I wonder if we could do something like this (totally untested): >>> >>> public class TypedKafkaAvroDeserializer extends >>> AbstractKafkaAvroDeserializer implements Deserializer { >>>@Override >>> public T deserialize(String s, byte[] bytes) { >>> return (T) this.deserialize(bytes); >>> } >>> } >>> >>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >>> andrew+b...@andrew-jones.com> wrote: >>> Thanks Tim, that works! Full code is: public class EnvelopeKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer { @Override public void configure(Map configs, boolean isKey) { configure(new KafkaAvroDeserializerConfig(configs)); } @Override public Envelope deserialize(String s, byte[] bytes) { return (Envelope) this.deserialize(bytes); } @Override public void close() {} } Nicer than my solution so think that is the one I'm going to go with for now. Thanks, Andrew On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: Hi Andrew, I also saw the same behaviour. It's not pretty but perhaps try this? It was my last idea I ran out of time to try... *// Basically a copy KafkaAvroDeserializer with the casts in deserialize**public class *EnvelopeAvroDeserializer *extends *AbstractKafkaAvroDeserializer *implements *Deserializer { ... *public *Envelope deserialize(String s, *byte*[] bytes) { *return *(Envelope) *this*.deserialize(bytes); } *public *Envelope deserialize(String s, *byte*[] bytes, Schema readerSchema) { *return *(Envelope) *this*.deserialize(bytes, readerSchema); } ... } Tim On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < andrew+b...@andrew-jones.com> wrote: Using Object doesn't work unfortunately. I get an 'Unable to automatically infer a Coder' error at runtime. This is the code: p.apply(KafkaIO.read() .withValueDeserializer(KafkaAvroDeserializer.class) It compiles, but at runtime: Caused by: java.lang.RuntimeException: Unable to automatically infer a Coder for the Kafka Deserializer class io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder registered for type class java.lang.Object at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) So far the only thing I've got working is this, where I use the ByteArrayDeserializer and then parse Avro myself: private static KafkaAvroDecoder avroDecoder; static { final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, " http://registry:8081";); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); VerifiableProperties vProps = new VerifiableProperties(props); avroDecoder = new KafkaAvroDecoder(vProps); } public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); p.apply(KafkaIO.read() .withBootstrapServers("kafka:9092") .withTopic("dbserver1.inventory.customers")
Re: KafkaIO and Avro
Hi Raghu I tried that but with KafkaAvroDeserializer already implementing Deserializer I couldn't get it to work... I didn't spend too much time though and agree something like that would be cleaner. Cheers, Tim On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi wrote: > Thanks Tim. > > How about extending KafkaAvroDeserializer rather than > AbstractKafkaAvroDeserializer? > > TypedKafkaAvroDeserializer class below is useful, but not directly usable > by the yet. It needs to store the actual type in Kafka consumer config to > retrieve at run time. > Even without storing the class, it is still useful. It simplifies user > code: > > public class EnvelopeKafkaAvroDeserializer extends > TypedKafkaAvroDeserializer {} > > This should be part of same package as KafkaAvroDeserializer (surprised it > is not there yet). > > On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson > wrote: > >> Happy to hear >> >> I wonder if we could do something like this (totally untested): >> >> public class TypedKafkaAvroDeserializer extends >> AbstractKafkaAvroDeserializer implements Deserializer { >>@Override >> public T deserialize(String s, byte[] bytes) { >> return (T) this.deserialize(bytes); >> } >> } >> >> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >> andrew+b...@andrew-jones.com> wrote: >> >>> Thanks Tim, that works! >>> >>> Full code is: >>> >>> public class EnvelopeKafkaAvroDeserializer extends >>> AbstractKafkaAvroDeserializer implements Deserializer { >>> @Override >>> public void configure(Map configs, boolean isKey) { >>> configure(new KafkaAvroDeserializerConfig(configs)); >>> } >>> >>> @Override >>> public Envelope deserialize(String s, byte[] bytes) { >>> return (Envelope) this.deserialize(bytes); >>> } >>> >>> @Override >>> public void close() {} >>> } >>> >>> Nicer than my solution so think that is the one I'm going to go with for >>> now. >>> >>> Thanks, >>> Andrew >>> >>> >>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>> >>> Hi Andrew, >>> >>> I also saw the same behaviour. >>> >>> It's not pretty but perhaps try this? It was my last idea I ran out of >>> time to try... >>> >>> >>> *// Basically a copy KafkaAvroDeserializer with the casts in >>> deserialize**public class *EnvelopeAvroDeserializer *extends >>> *AbstractKafkaAvroDeserializer *implements *Deserializer { >>> >>> ... >>> >>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>> >>> *return *(Envelope) *this*.deserialize(bytes); >>> >>> } >>> >>> >>> >>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >>> readerSchema) { >>> >>> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >>> >>> } >>> >>> >>> >>> ... >>> >>> } >>> >>> Tim >>> >>> >>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >>> andrew+b...@andrew-jones.com> wrote: >>> >>> >>> Using Object doesn't work unfortunately. I get an 'Unable to >>> automatically infer a Coder' error at runtime. >>> >>> This is the code: >>> >>> p.apply(KafkaIO.read() >>> .withValueDeserializer(KafkaAvroDeserializer.class) >>> >>> It compiles, but at runtime: >>> >>> Caused by: java.lang.RuntimeException: Unable to automatically infer a >>> Coder for the Kafka Deserializer class >>> io.confluent.kafka.serializers.KafkaAvroDeserializer: >>> no coder registered for type class java.lang.Object >>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) >>> >>> So far the only thing I've got working is this, where I use the >>> ByteArrayDeserializer and then parse Avro myself: >>> >>> private static KafkaAvroDecoder avroDecoder; >>> static { >>> final Properties props = new Properties(); >>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>> "kafka:9092"); >>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>> "http://registry:8081";); >>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>> true); >>> VerifiableProperties vProps = new VerifiableProperties(props); >>> avroDecoder = new KafkaAvroDecoder(vProps); >>> } >>> >>> public static void main(String[] args) { >>> >>> PipelineOptions options = PipelineOptionsFactory.create(); >>> Pipeline p = Pipeline.create(options); >>> >>> p.apply(KafkaIO.read() >>> .withBootstrapServers("kafka:9092") >>> .withTopic("dbserver1.inventory.customers") >>> .withKeyDeserializer(ByteArrayDeserializer.class) >>> .withValueDeserializer(ByteArrayDeserializer.class) >>> .withoutMetadata( >>> ) >>> .apply(Values.create()) >>> .apply("ParseAvro", ParDo.of(new DoFn>> Envelope>() { >>> @ProcessElement >>> public void processElement(ProcessContext c) { >>> Envelope data = (Envelope) >>> avroDecoder.fromBytes(c.ele
Re: KafkaIO and Avro
Thanks Tim. How about extending KafkaAvroDeserializer rather than AbstractKafkaAvroDeserializer? TypedKafkaAvroDeserializer class below is useful, but not directly usable by the yet. It needs to store the actual type in Kafka consumer config to retrieve at run time. Even without storing the class, it is still useful. It simplifies user code: public class EnvelopeKafkaAvroDeserializer extends TypedKafkaAvroDeserializer {} This should be part of same package as KafkaAvroDeserializer (surprised it is not there yet). On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson wrote: > Happy to hear > > I wonder if we could do something like this (totally untested): > > public class TypedKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer { >@Override > public T deserialize(String s, byte[] bytes) { > return (T) this.deserialize(bytes); > } > } > > On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > >> Thanks Tim, that works! >> >> Full code is: >> >> public class EnvelopeKafkaAvroDeserializer extends >> AbstractKafkaAvroDeserializer implements Deserializer { >> @Override >> public void configure(Map configs, boolean isKey) { >> configure(new KafkaAvroDeserializerConfig(configs)); >> } >> >> @Override >> public Envelope deserialize(String s, byte[] bytes) { >> return (Envelope) this.deserialize(bytes); >> } >> >> @Override >> public void close() {} >> } >> >> Nicer than my solution so think that is the one I'm going to go with for >> now. >> >> Thanks, >> Andrew >> >> >> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >> >> Hi Andrew, >> >> I also saw the same behaviour. >> >> It's not pretty but perhaps try this? It was my last idea I ran out of >> time to try... >> >> >> *// Basically a copy KafkaAvroDeserializer with the casts in >> deserialize**public class *EnvelopeAvroDeserializer *extends >> *AbstractKafkaAvroDeserializer *implements *Deserializer { >> >> ... >> >> *public *Envelope deserialize(String s, *byte*[] bytes) { >> >> *return *(Envelope) *this*.deserialize(bytes); >> >> } >> >> >> >> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >> readerSchema) { >> >> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >> >> } >> >> >> >> ... >> >> } >> >> Tim >> >> >> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >> andrew+b...@andrew-jones.com> wrote: >> >> >> Using Object doesn't work unfortunately. I get an 'Unable to >> automatically infer a Coder' error at runtime. >> >> This is the code: >> >> p.apply(KafkaIO.read() >> .withValueDeserializer(KafkaAvroDeserializer.class) >> >> It compiles, but at runtime: >> >> Caused by: java.lang.RuntimeException: Unable to automatically infer a >> Coder for the Kafka Deserializer class >> io.confluent.kafka.serializers.KafkaAvroDeserializer: >> no coder registered for type class java.lang.Object >> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) >> >> So far the only thing I've got working is this, where I use the >> ByteArrayDeserializer and then parse Avro myself: >> >> private static KafkaAvroDecoder avroDecoder; >> static { >> final Properties props = new Properties(); >> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); >> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >> "http://registry:8081";); >> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >> true); >> VerifiableProperties vProps = new VerifiableProperties(props); >> avroDecoder = new KafkaAvroDecoder(vProps); >> } >> >> public static void main(String[] args) { >> >> PipelineOptions options = PipelineOptionsFactory.create(); >> Pipeline p = Pipeline.create(options); >> >> p.apply(KafkaIO.read() >> .withBootstrapServers("kafka:9092") >> .withTopic("dbserver1.inventory.customers") >> .withKeyDeserializer(ByteArrayDeserializer.class) >> .withValueDeserializer(ByteArrayDeserializer.class) >> .withoutMetadata( >> ) >> .apply(Values.create()) >> .apply("ParseAvro", ParDo.of(new DoFn() >> { >> @ProcessElement >> public void processElement(ProcessContext c) { >> Envelope data = (Envelope) >> avroDecoder.fromBytes(c.element()); >> c.output(data); >> } >> })) >> >> Thanks, >> Andrew >> >> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >> >> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov >> wrote: >> >> It seems that KafkaAvroDeserializer implements Deserializer, >> though I suppose with proper configuration that Object will at run-time be >> your desired type. Have you tried adding some Java
Re: KafkaIO and Avro
Happy to hear I wonder if we could do something like this (totally untested): public class TypedKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer { @Override public T deserialize(String s, byte[] bytes) { return (T) this.deserialize(bytes); } } On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones wrote: > Thanks Tim, that works! > > Full code is: > > public class EnvelopeKafkaAvroDeserializer extends > AbstractKafkaAvroDeserializer implements Deserializer { > @Override > public void configure(Map configs, boolean isKey) { > configure(new KafkaAvroDeserializerConfig(configs)); > } > > @Override > public Envelope deserialize(String s, byte[] bytes) { > return (Envelope) this.deserialize(bytes); > } > > @Override > public void close() {} > } > > Nicer than my solution so think that is the one I'm going to go with for > now. > > Thanks, > Andrew > > > On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: > > Hi Andrew, > > I also saw the same behaviour. > > It's not pretty but perhaps try this? It was my last idea I ran out of > time to try... > > > *// Basically a copy KafkaAvroDeserializer with the casts in > deserialize**public class *EnvelopeAvroDeserializer *extends > *AbstractKafkaAvroDeserializer *implements *Deserializer { > > ... > > *public *Envelope deserialize(String s, *byte*[] bytes) { > > *return *(Envelope) *this*.deserialize(bytes); > > } > > > > *public *Envelope deserialize(String s, *byte*[] bytes, Schema > readerSchema) { > > *return *(Envelope) *this*.deserialize(bytes, readerSchema); > > } > > > > ... > > } > > Tim > > > On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > > Using Object doesn't work unfortunately. I get an 'Unable to automatically > infer a Coder' error at runtime. > > This is the code: > > p.apply(KafkaIO.read() > .withValueDeserializer(KafkaAvroDeserializer.class) > > It compiles, but at runtime: > > Caused by: java.lang.RuntimeException: Unable to automatically infer a > Coder for the Kafka Deserializer class > io.confluent.kafka.serializers.KafkaAvroDeserializer: > no coder registered for type class java.lang.Object > at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) > > So far the only thing I've got working is this, where I use the > ByteArrayDeserializer and then parse Avro myself: > > private static KafkaAvroDecoder avroDecoder; > static { > final Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://registry:8081";); > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > true); > VerifiableProperties vProps = new VerifiableProperties(props); > avroDecoder = new KafkaAvroDecoder(vProps); > } > > public static void main(String[] args) { > > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > p.apply(KafkaIO.read() > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withoutMetadata( > ) > .apply(Values.create()) > .apply("ParseAvro", ParDo.of(new DoFn() { > @ProcessElement > public void processElement(ProcessContext c) { > Envelope data = (Envelope) > avroDecoder.fromBytes(c.element()); > c.output(data); > } > })) > > Thanks, > Andrew > > On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: > > On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov > wrote: > > It seems that KafkaAvroDeserializer implements Deserializer, > though I suppose with proper configuration that Object will at run-time be > your desired type. Have you tried adding some Java type casts to make it > compile? > > > +1, cast might be the simplest fix. Alternately you can wrap or > extend KafkaAvroDeserializer as Tim suggested. It would cast the Object > returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime. > > > On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson > wrote: > > I just tried quickly and see the same as you Andrew. > We're missing something obvious or else extending KafkaAvroDeserializer seems > necessary right? > > On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > Hi, > > I'm trying to read Avro data from a Kafka stream using KafkaIO. I think > it should be as simple as: > > p.apply(KafkaIO.*read*() > .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, > Avr
Re: KafkaIO and Avro
Thanks Tim, that works! Full code is: public class EnvelopeKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer {@Override public void configure(Map configs, boolean isKey) { configure(new KafkaAvroDeserializerConfig(configs)); } @Override public Envelope deserialize(String s, byte[] bytes) { return (Envelope) this.deserialize(bytes); } @Override public void close() {} } Nicer than my solution so think that is the one I'm going to go with for now. Thanks, Andrew On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: > Hi Andrew, > > I also saw the same behaviour. > > It's not pretty but perhaps try this? It was my last idea I ran out of > time to try...> *// Basically a copy KafkaAvroDeserializer with the casts in > deserialize **public class *EnvelopeAvroDeserializer *extends *AbstractKafkaAvroDeserializer *implements *Deserializer { >> > > > ... > > > > > *public *Envelope deserialize(String s, *byte*[] bytes) { > > > > > *return *(Envelope) *this*.deserialize(bytes); > > > > > } > > > > > > > > > > *public *Envelope deserialize(String s, *byte*[] bytes, Schema > readerSchema) { >> > > > *return *(Envelope) *this*.deserialize(bytes, readerSchema); > > > > > } > > > > > > > > > > ... > > > > > } > > > > > Tim > > > On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones jones.com> wrote:>> __ >> Using Object doesn't work unfortunately. I get an 'Unable to >> automatically infer a Coder' error at runtime.>> >> This is the code: >> >> p.apply(KafkaIO.read() >> .withValueDeserializer(KafkaAvroDeserializer.class) >> >> It compiles, but at runtime: >> >> Caused by: java.lang.RuntimeException: Unable to automatically infer >> a Coder for the Kafka Deserializer class >> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder >> registered for type class java.lang.Object>> at >> org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)>> >> So far the only thing I've got working is this, where I use the >> ByteArrayDeserializer and then parse Avro myself:>> >> private static KafkaAvroDecoder avroDecoder; >> static { >> final Properties props = new Properties(); >> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >> "kafka:9092");>> >> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_C- >> ONFIG, "http://registry:8081";);>> >> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_C- >> ONFIG, true);>> VerifiableProperties vProps = new >> VerifiableProperties(props);>> avroDecoder = new >> KafkaAvroDecoder(vProps); >> } >> >> public static void main(String[] args) { >> >> PipelineOptions options = PipelineOptionsFactory.create(); >> Pipeline p = Pipeline.create(options); >> >> p.apply(KafkaIO.read() >> .withBootstrapServers("kafka:9092") >> .withTopic("dbserver1.inventory.customers") >> .withKeyDeserializer(ByteArrayDeserializer.class) >> .withValueDeserializer(ByteArrayDeserializer.class) >> .withoutMetadata( >> ) >> .apply(Values.create()) >> .apply("ParseAvro", ParDo.of(new DoFn> Envelope>() {>> @ProcessElement >> public void processElement(ProcessContext c) { >> Envelope data = (Envelope) >> avroDecoder.fromBytes(c.element());>> >> c.output(data); >> } >> })) >> >> Thanks, >> Andrew >> >> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov >>> wrote: It seems that KafkaAvroDeserializer >>> implements Deserializer, though I suppose with proper configuration that Object will at run-time be your desired type. Have you tried adding some Java type casts to make it compile?>>> >>> +1, cast might be the simplest fix. Alternately you can wrap or >>> extend KafkaAvroDeserializer as Tim suggested. It would cast the >>> Object returned by KafkaAvroDeserializer::deserializer() to Envolope >>> at runtime.>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson wrote:> I just tried quickly and see the same as you Andrew. > We're missing something obvious or else extending > KafkaAvroDeserializer seems necessary right?> > On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones jones.com> wrote:>> Hi, >> >> I'm trying to read Avro data from a Kafka stream using KafkaIO. I >> think>> it should be as simple as: >> >> p.apply(KafkaIO.*read*() >> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >> AvroCoder.of(Envelope.class)) >> >> Where Envelo
Re: KafkaIO and Avro
Hi Andrew, I also saw the same behaviour. It's not pretty but perhaps try this? It was my last idea I ran out of time to try... // Basically a copy KafkaAvroDeserializer with the casts in deserialize public class EnvelopeAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer { ... public Envelope deserialize(String s, byte[] bytes) { return (Envelope) this.deserialize(bytes); } public Envelope deserialize(String s, byte[] bytes, Schema readerSchema) { return (Envelope) this.deserialize(bytes, readerSchema); } ... } Tim On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones wrote: > Using Object doesn't work unfortunately. I get an 'Unable to automatically > infer a Coder' error at runtime. > > This is the code: > > p.apply(KafkaIO.read() > .withValueDeserializer(KafkaAvroDeserializer.class) > > It compiles, but at runtime: > > Caused by: java.lang.RuntimeException: Unable to automatically infer a > Coder for the Kafka Deserializer class > io.confluent.kafka.serializers.KafkaAvroDeserializer: > no coder registered for type class java.lang.Object > at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) > > So far the only thing I've got working is this, where I use the > ByteArrayDeserializer and then parse Avro myself: > > private static KafkaAvroDecoder avroDecoder; > static { > final Properties props = new Properties(); > props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); > props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, > "http://registry:8081";); > props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, > true); > VerifiableProperties vProps = new VerifiableProperties(props); > avroDecoder = new KafkaAvroDecoder(vProps); > } > > public static void main(String[] args) { > > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > > p.apply(KafkaIO.read() > .withBootstrapServers("kafka:9092") > .withTopic("dbserver1.inventory.customers") > .withKeyDeserializer(ByteArrayDeserializer.class) > .withValueDeserializer(ByteArrayDeserializer.class) > .withoutMetadata( > ) > .apply(Values.create()) > .apply("ParseAvro", ParDo.of(new DoFn() { > @ProcessElement > public void processElement(ProcessContext c) { > Envelope data = (Envelope) avroDecoder.fromBytes(c. > element()); > c.output(data); > } > })) > > Thanks, > Andrew > > On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: > > On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov > wrote: > > It seems that KafkaAvroDeserializer implements Deserializer, > though I suppose with proper configuration that Object will at run-time be > your desired type. Have you tried adding some Java type casts to make it > compile? > > > +1, cast might be the simplest fix. Alternately you can wrap or > extend KafkaAvroDeserializer as Tim suggested. It would cast the Object > returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime. > > > On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson > wrote: > > I just tried quickly and see the same as you Andrew. > We're missing something obvious or else extending KafkaAvroDeserializer seems > necessary right? > > On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > > Hi, > > I'm trying to read Avro data from a Kafka stream using KafkaIO. I think > it should be as simple as: > > p.apply(KafkaIO.*read*() > .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > Where Envelope is the name of the Avro class. However, that does not > compile and I get the following error: > > incompatible types: > java.lang.Class > cannot be converted to java.lang.Class org.apache.kafka.common.serialization.Deserializer .inventory.customers.Envelope>> > > I've tried a number of variations on this theme but haven't yet worked > it out and am starting to run out of ideas... > > Has anyone successfully read Avro data from Kafka? > > The code I'm using can be found at > https://github.com/andrewrjones/debezium-kafka-beam-example and a full > environment can be created with Docker. > > Thanks, > Andrew > > >
Re: KafkaIO and Avro
Using Object doesn't work unfortunately. I get an 'Unable to automatically infer a Coder' error at runtime. This is the code: p.apply(KafkaIO.read() .withValueDeserializer(KafkaAvroDeserializer.class) It compiles, but at runtime: Caused by: java.lang.RuntimeException: Unable to automatically infer a Coder for the Kafka Deserializer class io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder registered for type class java.lang.Objectat org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) So far the only thing I've got working is this, where I use the ByteArrayDeserializer and then parse Avro myself: private static KafkaAvroDecoder avroDecoder; static { final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONF- IG, "http://registry:8081";); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONF- IG, true);VerifiableProperties vProps = new VerifiableProperties(props); avroDecoder = new KafkaAvroDecoder(vProps); } public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); p.apply(KafkaIO.read() .withBootstrapServers("kafka:9092") .withTopic("dbserver1.inventory.customers") .withKeyDeserializer(ByteArrayDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withoutMetadata( ) .apply(Values.create()) .apply("ParseAvro", ParDo.of(new DoFn() {@ProcessElement public void processElement(ProcessContext c) { Envelope data = (Envelope) avroDecoder.fromBytes(c.element()); c.output(data); } })) Thanks, Andrew On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: > On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov > wrote:>> It seems that KafkaAvroDeserializer > implements Deserializer, >> though I suppose with proper configuration that Object will at run- >> time be your desired type. Have you tried adding some Java type casts >> to make it compile?> > +1, cast might be the simplest fix. Alternately you can wrap or extend > KafkaAvroDeserializer as Tim suggested. It would cast the Object > returned by KafkaAvroDeserializer::deserializer() to Envolope at > runtime.> >> >> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson >> wrote:>>> I just tried quickly and see the same >> as you Andrew. >>> We're missing something obvious or else extending >>> KafkaAvroDeserializer seems necessary right?>>> >>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones >> jones.com> wrote: Hi, I'm trying to read Avro data from a Kafka stream using KafkaIO. I think it should be as simple as: p.apply(KafkaIO.*read*() .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) Where Envelope is the name of the Avro class. However, that does not compile and I get the following error: incompatible types: java.lang.Class>>> izer> cannot be converted to java.lang.Class>>> org.apache.kafka.common.serialization.Deserializer>>> tory.customers.Envelope>> I've tried a number of variations on this theme but haven't yet worked it out and am starting to run out of ideas... Has anyone successfully read Avro data from Kafka? The code I'm using can be found at https://github.com/andrewrjones/debezium-kafka-beam-example and a full environment can be created with Docker. Thanks, Andrew
Re: KafkaIO and Avro
On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov wrote: > It seems that KafkaAvroDeserializer implements Deserializer, > though I suppose with proper configuration that Object will at run-time be > your desired type. Have you tried adding some Java type casts to make it > compile? > +1, cast might be the simplest fix. Alternately you can wrap or extend KafkaAvroDeserializer as Tim suggested. It would cast the Object returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime. > On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson > wrote: > >> I just tried quickly and see the same as you Andrew. >> We're missing something obvious or else extending KafkaAvroDeserializer seems >> necessary right? >> >> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < >> andrew+b...@andrew-jones.com> wrote: >> >>> Hi, >>> >>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think >>> it should be as simple as: >>> >>> p.apply(KafkaIO.*read*() >>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >>> AvroCoder.of(Envelope.class)) >>> >>> Where Envelope is the name of the Avro class. However, that does not >>> compile and I get the following error: >>> >>> incompatible types: >>> java.lang.Class >>> cannot be converted to java.lang.Class>> org.apache.kafka.common.serialization.Deserializer< >>> dbserver1.inventory.customers.Envelope>> >>> >>> I've tried a number of variations on this theme but haven't yet worked >>> it out and am starting to run out of ideas... >>> >>> Has anyone successfully read Avro data from Kafka? >>> >>> The code I'm using can be found at >>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full >>> environment can be created with Docker. >>> >>> Thanks, >>> Andrew >>> >> >>
Re: KafkaIO and Avro
It seems that KafkaAvroDeserializer implements Deserializer, though I suppose with proper configuration that Object will at run-time be your desired type. Have you tried adding some Java type casts to make it compile? On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson wrote: > I just tried quickly and see the same as you Andrew. > We're missing something obvious or else extending KafkaAvroDeserializer seems > necessary right? > > On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < > andrew+b...@andrew-jones.com> wrote: > >> Hi, >> >> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think >> it should be as simple as: >> >> p.apply(KafkaIO.*read*() >> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >> AvroCoder.of(Envelope.class)) >> >> Where Envelope is the name of the Avro class. However, that does not >> compile and I get the following error: >> >> incompatible types: >> java.lang.Class >> cannot be converted to java.lang.Class> >> org.apache.kafka.common.serialization.Deserializer> >> >> I've tried a number of variations on this theme but haven't yet worked >> it out and am starting to run out of ideas... >> >> Has anyone successfully read Avro data from Kafka? >> >> The code I'm using can be found at >> https://github.com/andrewrjones/debezium-kafka-beam-example and a full >> environment can be created with Docker. >> >> Thanks, >> Andrew >> > >
Re: KafkaIO and Avro
I just tried quickly and see the same as you Andrew. We're missing something obvious or else extending KafkaAvroDeserializer seems necessary right? On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones wrote: > Hi, > > I'm trying to read Avro data from a Kafka stream using KafkaIO. I think > it should be as simple as: > > p.apply(KafkaIO.*read*() > .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, > AvroCoder.of(Envelope.class)) > > Where Envelope is the name of the Avro class. However, that does not > compile and I get the following error: > > incompatible types: > java.lang.Class > cannot be converted to java.lang.Class org.apache.kafka.common.serialization.Deserializer< > dbserver1.inventory.customers.Envelope>> > > I've tried a number of variations on this theme but haven't yet worked > it out and am starting to run out of ideas... > > Has anyone successfully read Avro data from Kafka? > > The code I'm using can be found at > https://github.com/andrewrjones/debezium-kafka-beam-example and a full > environment can be created with Docker. > > Thanks, > Andrew >
KafkaIO and Avro
Hi, I'm trying to read Avro data from a Kafka stream using KafkaIO. I think it should be as simple as: p.apply(KafkaIO.*read*() .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) Where Envelope is the name of the Avro class. However, that does not compile and I get the following error: incompatible types: java.lang.Class cannot be converted to java.lang.Class> I've tried a number of variations on this theme but haven't yet worked it out and am starting to run out of ideas... Has anyone successfully read Avro data from Kafka? The code I'm using can be found at https://github.com/andrewrjones/debezium-kafka-beam-example and a full environment can be created with Docker. Thanks, Andrew