Thanks Eugene On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <rang...@google.com> wrote:
> Ah, nice. It works. > > On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov <kirpic...@google.com> > wrote: > >> The following compiles fine: >> >> >> p.apply(KafkaIO.<String, Envelope>read() >> .withBootstrapServers("kafka:9092") >> .withTopic("dbserver1.inventory.customers") >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializerAndCoder >> ((Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class)) >> >> >> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com> wrote: >> >>> Same for me. It does not look like there is an annotation to suppress >>> the error. >>> >>> >>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson < >>> timrobertson...@gmail.com> wrote: >>> >>>> Hi Eugene, >>>> >>>> I understood that was where Andrew started and reported this. I tried >>>> and saw the same as him. >>>> >>>> incompatible types: java.lang.Class<io.confluent.k >>>> afka.serializers.KafkaAvroDeserializer> cannot be converted to >>>> org.apache.kafka.common.serialization.Deserializer<org.gbif. >>>> pipelines.io.avro.Envelope> >>>> >>>> similarly with >>>> (Class<? extends Deserializer<Envelope>>) KafkaAvroDeserializer.class >>>> >>>> >>>> >>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com >>>> > wrote: >>>> >>>>> I don't think extending the class is necessary. Not sure I understand >>>>> why a simple type casting for withDeserializerAndCoder doesn't work? Have >>>>> you tried this? >>>>> >>>>> p.apply(KafkaIO.<String, Envelope>read() >>>>> .withValueDeserializerAndCoder((Deserializer<Envelope>)Kafka >>>>> AvroDeserializer.class, >>>>> AvroCoder.of(Envelope.class)) >>>>> >>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson < >>>>> timrobertson...@gmail.com> wrote: >>>>> >>>>>> Hi Raghu >>>>>> >>>>>> I tried that but with KafkaAvroDeserializer already implementing >>>>>> Deserializer<Object> I couldn't get it to work... I didn't spend too >>>>>> much time though and agree something like that would be cleaner. >>>>>> >>>>>> Cheers, >>>>>> Tim >>>>>> >>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks Tim. >>>>>>> >>>>>>> How about extending KafkaAvroDeserializer rather >>>>>>> than AbstractKafkaAvroDeserializer? >>>>>>> >>>>>>> TypedKafkaAvroDeserializer class below is useful, but not directly >>>>>>> usable by the yet. It needs to store the actual type in Kafka consumer >>>>>>> config to retrieve at run time. >>>>>>> Even without storing the class, it is still useful. It simplifies >>>>>>> user code: >>>>>>> >>>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>>> TypedKafkaAvroDeserializer<Envelope> {} >>>>>>> >>>>>>> This should be part of same package as KafkaAvroDeserializer >>>>>>> (surprised it is not there yet). >>>>>>> >>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson < >>>>>>> timrobertson...@gmail.com> wrote: >>>>>>> >>>>>>>> Happy to hear >>>>>>>> >>>>>>>> I wonder if we could do something like this (totally untested): >>>>>>>> >>>>>>>> public class TypedKafkaAvroDeserializer<T> extends >>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> { >>>>>>>> @Override >>>>>>>> public T deserialize(String s, byte[] bytes) { >>>>>>>> return (T) this.deserialize(bytes); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >>>>>>>> andrew+b...@andrew-jones.com> wrote: >>>>>>>> >>>>>>>>> Thanks Tim, that works! >>>>>>>>> >>>>>>>>> Full code is: >>>>>>>>> >>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends >>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> { >>>>>>>>> @Override >>>>>>>>> public void configure(Map<String, ?> configs, boolean isKey) { >>>>>>>>> configure(new KafkaAvroDeserializerConfig(configs)); >>>>>>>>> } >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public Envelope deserialize(String s, byte[] bytes) { >>>>>>>>> return (Envelope) this.deserialize(bytes); >>>>>>>>> } >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public void close() {} >>>>>>>>> } >>>>>>>>> >>>>>>>>> Nicer than my solution so think that is the one I'm going to go >>>>>>>>> with for now. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Andrew >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>>>>>>>> >>>>>>>>> Hi Andrew, >>>>>>>>> >>>>>>>>> I also saw the same behaviour. >>>>>>>>> >>>>>>>>> It's not pretty but perhaps try this? It was my last idea I ran >>>>>>>>> out of time to try... >>>>>>>>> >>>>>>>>> >>>>>>>>> *// Basically a copy KafkaAvroDeserializer with the casts in >>>>>>>>> deserialize**public class *EnvelopeAvroDeserializer *extends >>>>>>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { >>>>>>>>> >>>>>>>>> ... >>>>>>>>> >>>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>>>>>>>> >>>>>>>>> *return *(Envelope) *this*.deserialize(bytes); >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >>>>>>>>> readerSchema) { >>>>>>>>> >>>>>>>>> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> ... >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> Tim >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >>>>>>>>> andrew+b...@andrew-jones.com> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable to >>>>>>>>> automatically infer a Coder' error at runtime. >>>>>>>>> >>>>>>>>> This is the code: >>>>>>>>> >>>>>>>>> p.apply(KafkaIO.<String, Object>read() >>>>>>>>> .withValueDeserializer(KafkaAv >>>>>>>>> roDeserializer.class) >>>>>>>>> >>>>>>>>> It compiles, but at runtime: >>>>>>>>> >>>>>>>>> Caused by: java.lang.RuntimeException: Unable to automatically >>>>>>>>> infer a Coder for the Kafka Deserializer class >>>>>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder >>>>>>>>> registered for type class java.lang.Object >>>>>>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java >>>>>>>>> :1696) >>>>>>>>> >>>>>>>>> So far the only thing I've got working is this, where I use the >>>>>>>>> ByteArrayDeserializer and then parse Avro myself: >>>>>>>>> >>>>>>>>> private static KafkaAvroDecoder avroDecoder; >>>>>>>>> static { >>>>>>>>> final Properties props = new Properties(); >>>>>>>>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>>>>>>>> "kafka:9092"); >>>>>>>>> props.put(AbstractKafkaAvroSer >>>>>>>>> DeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://registry:8081"); >>>>>>>>> props.put(KafkaAvroDeserialize >>>>>>>>> rConfig.SPECIFIC_AVRO_READER_CONFIG, true); >>>>>>>>> VerifiableProperties vProps = new >>>>>>>>> VerifiableProperties(props); >>>>>>>>> avroDecoder = new KafkaAvroDecoder(vProps); >>>>>>>>> } >>>>>>>>> >>>>>>>>> public static void main(String[] args) { >>>>>>>>> >>>>>>>>> PipelineOptions options = PipelineOptionsFactory.create(); >>>>>>>>> Pipeline p = Pipeline.create(options); >>>>>>>>> >>>>>>>>> p.apply(KafkaIO.<byte[], byte[]>read() >>>>>>>>> .withBootstrapServers("kafka:9092") >>>>>>>>> .withTopic("dbserver1.inventory.customers") >>>>>>>>> .withKeyDeserializer(ByteArrayDeserializer.class) >>>>>>>>> .withValueDeserializer(ByteArr >>>>>>>>> ayDeserializer.class) >>>>>>>>> .withoutMetadata( >>>>>>>>> ) >>>>>>>>> .apply(Values.<byte[]>create()) >>>>>>>>> .apply("ParseAvro", ParDo.of(new DoFn<byte[], >>>>>>>>> Envelope>() { >>>>>>>>> @ProcessElement >>>>>>>>> public void processElement(ProcessContext c) { >>>>>>>>> Envelope data = (Envelope) >>>>>>>>> avroDecoder.fromBytes(c.element()); >>>>>>>>> c.output(data); >>>>>>>>> } >>>>>>>>> })) >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Andrew >>>>>>>>> >>>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >>>>>>>>> >>>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov < >>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>> >>>>>>>>> It seems that KafkaAvroDeserializer implements >>>>>>>>> Deserializer<Object>, though I suppose with proper configuration that >>>>>>>>> Object will at run-time be your desired type. Have you tried adding >>>>>>>>> some >>>>>>>>> Java type casts to make it compile? >>>>>>>>> >>>>>>>>> >>>>>>>>> +1, cast might be the simplest fix. Alternately you can wrap or >>>>>>>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the >>>>>>>>> Object >>>>>>>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at >>>>>>>>> runtime. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson < >>>>>>>>> timrobertson...@gmail.com> wrote: >>>>>>>>> >>>>>>>>> I just tried quickly and see the same as you Andrew. >>>>>>>>> We're missing something obvious or else extending >>>>>>>>> KafkaAvroDeserializer seems necessary right? >>>>>>>>> >>>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < >>>>>>>>> andrew+b...@andrew-jones.com> wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I >>>>>>>>> think >>>>>>>>> it should be as simple as: >>>>>>>>> >>>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*() >>>>>>>>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >>>>>>>>> AvroCoder.of(Envelope.class)) >>>>>>>>> >>>>>>>>> Where Envelope is the name of the Avro class. However, that does >>>>>>>>> not >>>>>>>>> compile and I get the following error: >>>>>>>>> >>>>>>>>> incompatible types: >>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDese >>>>>>>>> rializer> >>>>>>>>> cannot be converted to java.lang.Class<? extends >>>>>>>>> org.apache.kafka.common.serialization.Deserializer<dbserver1 >>>>>>>>> .inventory.customers.Envelope>> >>>>>>>>> >>>>>>>>> I've tried a number of variations on this theme but haven't yet >>>>>>>>> worked >>>>>>>>> it out and am starting to run out of ideas... >>>>>>>>> >>>>>>>>> Has anyone successfully read Avro data from Kafka? >>>>>>>>> >>>>>>>>> The code I'm using can be found at >>>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a >>>>>>>>> full >>>>>>>>> environment can be created with Docker. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Andrew >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >