Thanks Eugene, that worked perfectly! Full final code at https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java. Thanks, Andrew
On Fri, Oct 20, 2017, at 05:10 PM, Eugene Kirpichov wrote: > This is due to Java doing type erasure in any expression that involves > a raw type. This will compile if you extract the result of > .apply(KafkaIO.read()...) into a local variable.> > On Fri, Oct 20, 2017, 1:51 AM Andrew Jones <andrew+beam@andrew- > jones.com[1]> wrote:>> __ >> Thanks Eugene. That does compile, although the rest of the pipeline >> doesn't seem happy.>> >> The next line is: >> >> .apply(Values.<Envelope>create()) >> >> But that now doesn't compile with the following error: >> >> /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExampl- >> e.java:[54,17] cannot find symbol>> symbol: method >> apply(org.apache.beam.sdk.transforms.Values<dbser- >> ver1.inventory.customers.Envelope>)>> location: interface >> org.apache.beam.sdk.values.POutput >> >> Don't really understand what's wrong here. It works fine when using >> the EnvelopeKafkaAvroDeserializer as suggested by Tim.>> >> Thanks, >> Andrew >> >> >> On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote: >>> Thanks Eugene >>> >>> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <rang...@google.com> >>> wrote:>>>> Ah, nice. It works. >>>> >>>> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov >>>> <kirpic...@google.com> wrote:>>>>> The following compiles fine: >>>>> >>>>> >>>>> p.apply(KafkaIO.<String, Envelope>read() >>>>> >>>>> .withBootstrapServers("kafka:9092") >>>>> .withTopic("dbserver1.inventory.customers") >>>>> >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializerAndCoder((Class)KafkaAvroDe- >>>>> serializer.class, AvroCoder.of(Envelope.class))>>>>> >>>>> >>>>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com> >>>>> wrote:>>>>>> Same for me. It does not look like there is an annotation to >>>>>> suppress the error.>>>>>> >>>>>> >>>>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson >>>>>> <timrobertson...@gmail.com> wrote:>>>>>>> Hi Eugene, >>>>>>> >>>>>>> I understood that was where Andrew started and reported this. I >>>>>>> tried and saw the same as him.>>>>>>> >>>>>>> incompatible types: java.lang.Class<io.confluent.kafka.serializ- >>>>>>> ers.KafkaAvroDeserializer> cannot be converted to org.apache.ka- >>>>>>> fka.common.serialization.Deserializer<org.gbif.pipelines.io.avr- >>>>>>> o.Envelope>>>>>>>> >>>>>>> similarly with >>>>>>> (Class<? *extends *Deserializer<Envelope>>) >>>>>>> KafkaAvroDeserializer.*class*>>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov >>>>>>> <kirpic...@google.com> wrote:>>>>>>>> I don't think extending the class >>>>>>> is necessary. Not sure I >>>>>>>> understand why a simple type casting for >>>>>>>> withDeserializerAndCoder doesn't work? Have you tried this?>>>>>>>> >>>>>>>> p.apply(KafkaIO.<String, Envelope>read() >>>>>>>> .withValueDeserializerAndCoder((Deserializer<Envelope>)Kafka- >>>>>>>> AvroDeserializer.class,>>>>>>>> AvroCoder.of(Envelope.class)) >>>>>>>> >>>>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson >>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>> Hi Raghu >>>>>>>>> >>>>>>>>> I tried that but with KafkaAvroDeserializer already >>>>>>>>> implementing Deserializer<Object> I couldn't get it to work... >>>>>>>>> I didn't spend too much time though and agree something like >>>>>>>>> that would be cleaner.>>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Tim >>>>>>>>> >>>>>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi >>>>>>>>> <rang...@google.com> wrote:>>>>>>>>>> Thanks Tim. >>>>>>>>>> >>>>>>>>>> How about extending KafkaAvroDeserializer rather than >>>>>>>>>> AbstractKafkaAvroDeserializer?>>>>>>>>>> >>>>>>>>>> TypedKafkaAvroDeserializer class below is useful, but not >>>>>>>>>> directly usable by the yet. It needs to store the actual type >>>>>>>>>> in Kafka consumer config to retrieve at run time.>>>>>>>>>> Even >>>>>>>>>> without storing the class, it is still useful. It >>>>>>>>>> simplifies user code:>>>>>>>>>> >>>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>>>>>> TypedKafkaAvroDeserializer<Envelope> {}>>>>>>>>>> >>>>>>>>>> This should be part of same package as KafkaAvroDeserializer >>>>>>>>>> (surprised it is not there yet).>>>>>>>>>> >>>>>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson >>>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>>>> Happy to hear >>>>>>>>>>> >>>>>>>>>>> I wonder if we could do something like this (totally >>>>>>>>>>> untested):>>>>>>>>>>> >>>>>>>>>>> public class TypedKafkaAvroDeserializer<T> extends >>>>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> >>>>>>>>>>> {>>>>>>>>>>> @Override >>>>>>>>>>> >>>>>>>>>>> public T deserialize(String s, byte[] bytes) { >>>>>>>>>>> return (T) this.deserialize(bytes); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+beam@andrew- >>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>> __ >>>>>>>>>>>> Thanks Tim, that works! >>>>>>>>>>>> >>>>>>>>>>>> Full code is: >>>>>>>>>>>> >>>>>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>>>>>>>> AbstractKafkaAvroDeserializer implements >>>>>>>>>>>> Deserializer<Envelope> {>>>>>>>>>>>> @Override >>>>>>>>>>>> public void configure(Map<String, ?> configs, boolean >>>>>>>>>>>> isKey) {>>>>>>>>>>>> configure(new >>>>>>>>>>>> KafkaAvroDeserializerConfig(configs));>>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> >>>>>>>>>>>> public Envelope deserialize(String s, byte[] bytes) { >>>>>>>>>>>> return (Envelope) this.deserialize(bytes); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public void close() {} >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> Nicer than my solution so think that is the one I'm going >>>>>>>>>>>> to go with for now.>>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Andrew >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>>>>>>>>>>>> Hi Andrew, >>>>>>>>>>>>> >>>>>>>>>>>>> I also saw the same behaviour. >>>>>>>>>>>>> >>>>>>>>>>>>> It's not pretty but perhaps try this? It was my last idea >>>>>>>>>>>>> I ran out of time to try...>>>>>>>>>>>>> *// Basically a copy >>>>>>>>>>>>> KafkaAvroDeserializer with the casts >>>>>>>>>>>>> in deserialize **public class *EnvelopeAvroDeserializer *extends *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> ... >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes) >>>>>>>>>>>>> { >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> *return *(Envelope) *this*.deserialize(bytes); >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes, >>>>>>>>>>>>> Schema readerSchema) { >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> *return *(Envelope) *this*.deserialize(bytes, >>>>>>>>>>>>> readerSchema); >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> ... >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Tim >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones >>>>>>>>>>>>> <andrew+beam@andrew- >>>>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>>>> __ >>>>>>>>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable >>>>>>>>>>>>>> to automatically infer a Coder' error at runtime.>>>>>>>>>>>>>> >>>>>>>>>>>>>> This is the code: >>>>>>>>>>>>>> >>>>>>>>>>>>>> p.apply(KafkaIO.<String, Object>read() >>>>>>>>>>>>>> .withValueDeserializer(KafkaAvroDeserial- >>>>>>>>>>>>>> izer.class)>>>>>>>>>>>>>> >>>>>>>>>>>>>> It compiles, but at runtime: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Caused by: java.lang.RuntimeException: Unable to >>>>>>>>>>>>>> automatically infer a Coder for the Kafka Deserializer >>>>>>>>>>>>>> class >>>>>>>>>>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no >>>>>>>>>>>>>> coder registered for type class java.lang.Object>>>>>>>>>>>>>> >>>>>>>>>>>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(Kafka- >>>>>>>>>>>>>> IO.java:1696)>>>>>>>>>>>>>> >>>>>>>>>>>>>> So far the only thing I've got working is this, where I >>>>>>>>>>>>>> use the ByteArrayDeserializer and then parse Avro >>>>>>>>>>>>>> myself:>>>>>>>>>>>>>> >>>>>>>>>>>>>> private static KafkaAvroDecoder avroDecoder; >>>>>>>>>>>>>> static { >>>>>>>>>>>>>> final Properties props = new Properties(); >>>>>>>>>>>>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFI- >>>>>>>>>>>>>> G, "kafka:9092");>>>>>>>>>>>>>> >>>>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_RE- >>>>>>>>>>>>>> GISTRY_URL_CONFIG, >>>>>>>>>>>>>> "http://registry:8081");>>>>>>>>>>>>>> >>>>>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_A- >>>>>>>>>>>>>> VRO_READER_CONFIG, true);>>>>>>>>>>>>>> >>>>>>>>>>>>>> VerifiableProperties vProps = new >>>>>>>>>>>>>> VerifiableProperties(props);>>>>>>>>>>>>>> >>>>>>>>>>>>>> avroDecoder = new KafkaAvroDecoder(vProps); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>>>>> >>>>>>>>>>>>>> PipelineOptions options = >>>>>>>>>>>>>> PipelineOptionsFactory.create();>>>>>>>>>>>>>> >>>>>>>>>>>>>> Pipeline p = Pipeline.create(options); >>>>>>>>>>>>>> >>>>>>>>>>>>>> p.apply(KafkaIO.<byte[], byte[]>read() >>>>>>>>>>>>>> .withBootstrapServers("kafka:9092") >>>>>>>>>>>>>> .withTopic("dbserver1.inventory.customer- >>>>>>>>>>>>>> s")>>>>>>>>>>>>>> >>>>>>>>>>>>>> .withKeyDeserializer(ByteArrayDeserializ- >>>>>>>>>>>>>> er.class)>>>>>>>>>>>>>> >>>>>>>>>>>>>> .withValueDeserializer(ByteArrayDeserial- >>>>>>>>>>>>>> izer.class)>>>>>>>>>>>>>> >>>>>>>>>>>>>> .withoutMetadata( >>>>>>>>>>>>>> ) >>>>>>>>>>>>>> .apply(Values.<byte[]>create()) >>>>>>>>>>>>>> .apply("ParseAvro", ParDo.of(new >>>>>>>>>>>>>> DoFn<byte[], Envelope>() {>>>>>>>>>>>>>> >>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>> public void >>>>>>>>>>>>>> processElement(ProcessContext c) >>>>>>>>>>>>>> {>>>>>>>>>>>>>> Envelope data = >>>>>>>>>>>>>> (Envelope) avroD- >>>>>>>>>>>>>> >>>>>>>>>>>>>> ecoder.fromBytes(c.element());>>>>>>>>>>>>>> >>>>>>>>>>>>>> c.output(data); >>>>>>>>>>>>>> } >>>>>>>>>>>>>> })) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov >>>>>>>>>>>>>>> <kirpic...@google.com> wrote:>>>>>>>>>>>>>>>> It seems that >>>>>>>>>>>>>>> KafkaAvroDeserializer implements >>>>>>>>>>>>>>>> Deserializer<Object>, though I suppose with proper >>>>>>>>>>>>>>>> configuration that Object will at run-time be your >>>>>>>>>>>>>>>> desired type. Have you tried adding some Java type >>>>>>>>>>>>>>>> casts to make it compile?>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> +1, cast might be the simplest fix. Alternately you can >>>>>>>>>>>>>>> wrap or extend KafkaAvroDeserializer as Tim suggested. >>>>>>>>>>>>>>> It would cast the Object returned by >>>>>>>>>>>>>>> KafkaAvroDeserializer::deserializer() to Envolope at >>>>>>>>>>>>>>> runtime.>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson >>>>>>>>>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>>>>>>>>>> I just >>>>>>>>>>>>>>>> tried quickly and see the same as you Andrew.>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> We're missing something obvious or else extending >>>>>>>>>>>>>>>>> KafkaAvroDeserializer seems necessary right?>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones >>>>>>>>>>>>>>>>> <andrew+beam@andrew- >>>>>>>>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I'm trying to read Avro data from a Kafka stream >>>>>>>>>>>>>>>>>> using KafkaIO. I think>>>>>>>>>>>>>>>>>> it should be as >>>>>>>>>>>>>>>>>> simple as: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*() >>>>>>>>>>>>>>>>>> .withValueDeserializerAndCoder(KafkaAvroDeserializ- >>>>>>>>>>>>>>>>>> er.class,>>>>>>>>>>>>>>>>>> AvroCoder.of(Envelope.class)) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Where Envelope is the name of the Avro class. >>>>>>>>>>>>>>>>>> However, that does not>>>>>>>>>>>>>>>>>> compile and I get >>>>>>>>>>>>>>>>>> the following error: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> incompatible types: >>>>>>>>>>>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.Kafka- >>>>>>>>>>>>>>>>>> AvroDeserializer>>>>>>>>>>>>>>>>>>> cannot be converted to >>>>>>>>>>>>>>>>>> java.lang.Class<? extends >>>>>>>>>>>>>>>>>> org.apache.kafka.common.serialization.Deserializer<d- >>>>>>>>>>>>>>>>>> bserver1.inventory.customers.Envelope>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I've tried a number of variations on this theme but >>>>>>>>>>>>>>>>>> haven't yet worked>>>>>>>>>>>>>>>>>> it out and am starting >>>>>>>>>>>>>>>>>> to run out of ideas... >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Has anyone successfully read Avro data from Kafka? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The code I'm using can be found at >>>>>>>>>>>>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example >>>>>>>>>>>>>>>>>> and a full>>>>>>>>>>>>>>>>>> environment can be created with >>>>>>>>>>>>>>>>>> Docker. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Andrew >>>>>>>>>>>>>> >>>>>>>>>>>> >> Links: 1. mailto:andrew%2bb...@andrew-jones.com