Thanks Eugene. That does compile, although the rest of the pipeline doesn't seem happy. The next line is:
.apply(Values.<Envelope>create()) But that now doesn't compile with the following error: /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.j- ava:[54,17] cannot find symbol symbol: method apply(org.apache.beam.sdk.transforms.Values<dbserver- 1.inventory.customers.Envelope>) location: interface org.apache.beam.sdk.values.POutput Don't really understand what's wrong here. It works fine when using the EnvelopeKafkaAvroDeserializer as suggested by Tim. Thanks, Andrew On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote: > Thanks Eugene > > On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi > <rang...@google.com> wrote:>> Ah, nice. It works. >> >> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov >> <kirpic...@google.com> wrote:>>> The following compiles fine: >>> >>> >>> p.apply(KafkaIO.<String, Envelope>read() >>> >>> .withBootstrapServers("kafka:9092") >>> .withTopic("dbserver1.inventory.customers") >>> >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializerAndCoder((Class)KafkaAvroDese- >>> rializer.class, AvroCoder.of(Envelope.class))>>> >>> >>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com> >>> wrote:>>>> Same for me. It does not look like there is an annotation to >>>> suppress the error.>>>> >>>> >>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson >>>> <timrobertson...@gmail.com> wrote:>>>>> Hi Eugene, >>>>> >>>>> I understood that was where Andrew started and reported this. I >>>>> tried and saw the same as him.>>>>> >>>>> incompatible types: java.lang.Class<io.confluent.kafka.serializer- >>>>> s.KafkaAvroDeserializer> cannot be converted to org.apache.kafka.- >>>>> common.serialization.Deserializer<org.gbif.pipelines.io.avro.Enve- >>>>> lope>>>>>> >>>>> similarly with >>>>> (Class<? *extends *Deserializer<Envelope>>) >>>>> KafkaAvroDeserializer.*class*>>>>> >>>>> >>>>> >>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov >>>>> <kirpic...@google.com> wrote:>>>>>> I don't think extending the class is >>>>> necessary. Not sure I >>>>>> understand why a simple type casting for withDeserializerAndCoder >>>>>> doesn't work? Have you tried this?>>>>>> >>>>>> p.apply(KafkaIO.<String, Envelope>read() >>>>>> .withValueDeserializerAndCoder((Deserializer<Envelope>)KafkaAv- >>>>>> roDeserializer.class,>>>>>> AvroCoder.of(Envelope.class)) >>>>>> >>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson >>>>>> <timrobertson...@gmail.com> wrote:>>>>>>> Hi Raghu >>>>>>> >>>>>>> I tried that but with KafkaAvroDeserializer already implementing >>>>>>> Deserializer<Object> I couldn't get it to work... I didn't spend >>>>>>> too much time though and agree something like that would be >>>>>>> cleaner.>>>>>>> >>>>>>> Cheers, >>>>>>> Tim >>>>>>> >>>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi >>>>>>> <rang...@google.com> wrote:>>>>>>>> Thanks Tim. >>>>>>>> >>>>>>>> How about extending KafkaAvroDeserializer rather than >>>>>>>> AbstractKafkaAvroDeserializer?>>>>>>>> >>>>>>>> TypedKafkaAvroDeserializer class below is useful, but not >>>>>>>> directly usable by the yet. It needs to store the actual type >>>>>>>> in Kafka consumer config to retrieve at run time.>>>>>>>> Even without >>>>>>>> storing the class, it is still useful. It >>>>>>>> simplifies user code:>>>>>>>> >>>>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>>>> TypedKafkaAvroDeserializer<Envelope> {}>>>>>>>> >>>>>>>> This should be part of same package as KafkaAvroDeserializer >>>>>>>> (surprised it is not there yet).>>>>>>>> >>>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson >>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>> Happy to hear >>>>>>>>> >>>>>>>>> I wonder if we could do something like this (totally >>>>>>>>> untested):>>>>>>>>> >>>>>>>>> public class TypedKafkaAvroDeserializer<T> extends >>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> {>>>>>>>>> >>>>>>>>> @Override >>>>>>>>> >>>>>>>>> public T deserialize(String s, byte[] bytes) { >>>>>>>>> return (T) this.deserialize(bytes); >>>>>>>>> } >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+beam@andrew- >>>>>>>>> jones.com> wrote:>>>>>>>>>> __ >>>>>>>>>> Thanks Tim, that works! >>>>>>>>>> >>>>>>>>>> Full code is: >>>>>>>>>> >>>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>>>>>> AbstractKafkaAvroDeserializer implements >>>>>>>>>> Deserializer<Envelope> {>>>>>>>>>> @Override >>>>>>>>>> public void configure(Map<String, ?> configs, boolean >>>>>>>>>> isKey) {>>>>>>>>>> configure(new >>>>>>>>>> KafkaAvroDeserializerConfig(configs));>>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> >>>>>>>>>> public Envelope deserialize(String s, byte[] bytes) { >>>>>>>>>> return (Envelope) this.deserialize(bytes); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void close() {} >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Nicer than my solution so think that is the one I'm going to >>>>>>>>>> go with for now.>>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Andrew >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>>>>>>>>>> Hi Andrew, >>>>>>>>>>> >>>>>>>>>>> I also saw the same behaviour. >>>>>>>>>>> >>>>>>>>>>> It's not pretty but perhaps try this? It was my last idea I >>>>>>>>>>> ran out of time to try...>>>>>>>>>>> *// Basically a copy >>>>>>>>>>> KafkaAvroDeserializer with the casts in >>>>>>>>>>> deserialize **public class *EnvelopeAvroDeserializer *extends *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> ... >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *return *(Envelope) *this*.deserialize(bytes); >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes, >>>>>>>>>>> Schema readerSchema) { >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *return *(Envelope) *this*.deserialize(bytes, >>>>>>>>>>> readerSchema); >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> ... >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Tim >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <andrew+beam@andrew- >>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>> __ >>>>>>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable >>>>>>>>>>>> to automatically infer a Coder' error at runtime.>>>>>>>>>>>> >>>>>>>>>>>> This is the code: >>>>>>>>>>>> >>>>>>>>>>>> p.apply(KafkaIO.<String, Object>read() >>>>>>>>>>>> .withValueDeserializer(KafkaAvroDeserializ- >>>>>>>>>>>> er.class)>>>>>>>>>>>> >>>>>>>>>>>> It compiles, but at runtime: >>>>>>>>>>>> >>>>>>>>>>>> Caused by: java.lang.RuntimeException: Unable to >>>>>>>>>>>> automatically infer a Coder for the Kafka Deserializer >>>>>>>>>>>> class io.confluent.kafka.serializers.KafkaAvroDeserializer: >>>>>>>>>>>> no coder registered for type class java.lang.Object>>>>>>>>>>>> at >>>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO- >>>>>>>>>>>> .java:1696)>>>>>>>>>>>> >>>>>>>>>>>> So far the only thing I've got working is this, where I use >>>>>>>>>>>> the ByteArrayDeserializer and then parse Avro myself:>>>>>>>>>>>> >>>>>>>>>>>> private static KafkaAvroDecoder avroDecoder; >>>>>>>>>>>> static { >>>>>>>>>>>> final Properties props = new Properties(); >>>>>>>>>>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>>>>>>>>> "kafka:9092");>>>>>>>>>>>> >>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGI- >>>>>>>>>>>> STRY_URL_CONFIG, "http://registry:8081");>>>>>>>>>>>> >>>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVR- >>>>>>>>>>>> O_READER_CONFIG, true);>>>>>>>>>>>> >>>>>>>>>>>> VerifiableProperties vProps = new >>>>>>>>>>>> VerifiableProperties(props);>>>>>>>>>>>> >>>>>>>>>>>> avroDecoder = new KafkaAvroDecoder(vProps); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> public static void main(String[] args) { >>>>>>>>>>>> >>>>>>>>>>>> PipelineOptions options = >>>>>>>>>>>> PipelineOptionsFactory.create();>>>>>>>>>>>> >>>>>>>>>>>> Pipeline p = Pipeline.create(options); >>>>>>>>>>>> >>>>>>>>>>>> p.apply(KafkaIO.<byte[], byte[]>read() >>>>>>>>>>>> .withBootstrapServers("kafka:9092") >>>>>>>>>>>> >>>>>>>>>>>> .withTopic("dbserver1.inventory.customers")>>>>>>>>>>>> >>>>>>>>>>>> .withKeyDeserializer(ByteArrayDeserializer- >>>>>>>>>>>> .class)>>>>>>>>>>>> >>>>>>>>>>>> .withValueDeserializer(ByteArrayDeserializ- >>>>>>>>>>>> er.class)>>>>>>>>>>>> >>>>>>>>>>>> .withoutMetadata( >>>>>>>>>>>> ) >>>>>>>>>>>> .apply(Values.<byte[]>create()) >>>>>>>>>>>> .apply("ParseAvro", ParDo.of(new >>>>>>>>>>>> DoFn<byte[], Envelope>() {>>>>>>>>>>>> >>>>>>>>>>>> @ProcessElement >>>>>>>>>>>> public void >>>>>>>>>>>> processElement(ProcessContext c) {>>>>>>>>>>>> >>>>>>>>>>>> Envelope data = (Envelope) avroDec- >>>>>>>>>>>> oder.fromBytes(c.element());>>>>>>>>>>>> >>>>>>>>>>>> c.output(data); >>>>>>>>>>>> } >>>>>>>>>>>> })) >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Andrew >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >>>>>>>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov >>>>>>>>>>>>> <kirpic...@google.com> wrote:>>>>>>>>>>>>>> It seems that >>>>>>>>>>>>> KafkaAvroDeserializer implements >>>>>>>>>>>>>> Deserializer<Object>, though I suppose with proper >>>>>>>>>>>>>> configuration that Object will at run-time be your >>>>>>>>>>>>>> desired type. Have you tried adding some Java type casts >>>>>>>>>>>>>> to make it compile?>>>>>>>>>>>>> >>>>>>>>>>>>> +1, cast might be the simplest fix. Alternately you can >>>>>>>>>>>>> wrap or extend KafkaAvroDeserializer as Tim suggested. It >>>>>>>>>>>>> would cast the Object returned by >>>>>>>>>>>>> KafkaAvroDeserializer::deserializer() to Envolope at >>>>>>>>>>>>> runtime.>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson >>>>>>>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>>>>>>>> I just tried >>>>>>>>>>>>>> quickly and see the same as you Andrew. >>>>>>>>>>>>>>> We're missing something obvious or else extending >>>>>>>>>>>>>>> KafkaAvroDeserializer seems necessary right?>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones >>>>>>>>>>>>>>> <andrew+beam@andrew- >>>>>>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm trying to read Avro data from a Kafka stream using >>>>>>>>>>>>>>>> KafkaIO. I think>>>>>>>>>>>>>>>> it should be as simple as: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*() >>>>>>>>>>>>>>>> .withValueDeserializerAndCoder(KafkaAvroDeserializer- >>>>>>>>>>>>>>>> .class,>>>>>>>>>>>>>>>> AvroCoder.of(Envelope.class)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Where Envelope is the name of the Avro class. However, >>>>>>>>>>>>>>>> that does not>>>>>>>>>>>>>>>> compile and I get the following >>>>>>>>>>>>>>>> error: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> incompatible types: >>>>>>>>>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAv- >>>>>>>>>>>>>>>> roDeserializer>>>>>>>>>>>>>>>>> cannot be converted to >>>>>>>>>>>>>>>> java.lang.Class<? extends >>>>>>>>>>>>>>>> org.apache.kafka.common.serialization.Deserializer<dbs- >>>>>>>>>>>>>>>> erver1.inventory.customers.Envelope>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I've tried a number of variations on this theme but >>>>>>>>>>>>>>>> haven't yet worked>>>>>>>>>>>>>>>> it out and am starting to >>>>>>>>>>>>>>>> run out of ideas... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Has anyone successfully read Avro data from Kafka? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The code I'm using can be found at >>>>>>>>>>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example >>>>>>>>>>>>>>>> and a full>>>>>>>>>>>>>>>> environment can be created with >>>>>>>>>>>>>>>> Docker. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Andrew >>>>>>>>>>>> >>>>>>>>>>