Thanks Eugene, that worked perfectly!

Full final code at
https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java.
Thanks,
Andrew


On Fri, Oct 20, 2017, at 05:10 PM, Eugene Kirpichov wrote:
> This is due to Java doing type erasure in any expression that involves
> a raw type. This will compile if you extract the result of
> .apply(KafkaIO.read()...) into a local variable.> 
> On Fri, Oct 20, 2017, 1:51 AM Andrew Jones <andrew+beam@andrew-
> jones.com[1]> wrote:>> __
>> Thanks Eugene. That does compile, although the rest of the pipeline
>> doesn't seem happy.>> 
>> The next line is:
>> 
>> .apply(Values.<Envelope>create())
>> 
>> But that now doesn't compile with the following error:
>> 
>> /usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExampl-
>> e.java:[54,17] cannot find symbol>>   symbol:   method 
>> apply(org.apache.beam.sdk.transforms.Values<dbser-
>>   ver1.inventory.customers.Envelope>)>>   location: interface 
>> org.apache.beam.sdk.values.POutput
>> 
>> Don't really understand what's wrong here. It works fine when using
>> the EnvelopeKafkaAvroDeserializer as suggested by Tim.>> 
>> Thanks,
>> Andrew
>> 
>> 
>> On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
>>> Thanks Eugene 
>>> 
>>> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <rang...@google.com>
>>> wrote:>>>> Ah, nice. It works. 
>>>> 
>>>> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov
>>>> <kirpic...@google.com> wrote:>>>>> The following compiles fine:
>>>>> 
>>>>> 
>>>>>         p.apply(KafkaIO.<String, Envelope>read()
>>>>> 
>>>>>                 .withBootstrapServers("kafka:9092")
>>>>>                 .withTopic("dbserver1.inventory.customers")
>>>>> 
>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>                 .withValueDeserializerAndCoder((Class)KafkaAvroDe-
>>>>>                 serializer.class, AvroCoder.of(Envelope.class))>>>>> 
>>>>> 
>>>>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com>
>>>>> wrote:>>>>>> Same for me. It does not look like there is an annotation to
>>>>>> suppress the error.>>>>>> 
>>>>>> 
>>>>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson
>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>> Hi Eugene,
>>>>>>> 
>>>>>>> I understood that was where Andrew started and reported this.  I
>>>>>>> tried and saw the same as him.>>>>>>> 
>>>>>>> incompatible types: java.lang.Class<io.confluent.kafka.serializ-
>>>>>>> ers.KafkaAvroDeserializer> cannot be converted to org.apache.ka-
>>>>>>> fka.common.serialization.Deserializer<org.gbif.pipelines.io.avr-
>>>>>>> o.Envelope>>>>>>>> 
>>>>>>> similarly with 
>>>>>>> (Class<? *extends *Deserializer<Envelope>>)
>>>>>>> KafkaAvroDeserializer.*class*>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov
>>>>>>> <kirpic...@google.com> wrote:>>>>>>>> I don't think extending the class 
>>>>>>> is necessary. Not sure I
>>>>>>>> understand why a simple type casting for
>>>>>>>> withDeserializerAndCoder doesn't work? Have you tried this?>>>>>>>> 
>>>>>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>>>>>   .withValueDeserializerAndCoder((Deserializer<Envelope>)Kafka-
>>>>>>>>   AvroDeserializer.class,>>>>>>>>   AvroCoder.of(Envelope.class))
>>>>>>>> 
>>>>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson
>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>> Hi Raghu
>>>>>>>>> 
>>>>>>>>> I tried that but with KafkaAvroDeserializer already
>>>>>>>>> implementing Deserializer<Object> I couldn't get it to work...
>>>>>>>>> I didn't spend too much time though and agree something like
>>>>>>>>> that would be cleaner.>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Tim 
>>>>>>>>> 
>>>>>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi
>>>>>>>>> <rang...@google.com> wrote:>>>>>>>>>> Thanks Tim. 
>>>>>>>>>> 
>>>>>>>>>> How about extending KafkaAvroDeserializer rather than
>>>>>>>>>> AbstractKafkaAvroDeserializer?>>>>>>>>>> 
>>>>>>>>>> TypedKafkaAvroDeserializer class below is useful, but not
>>>>>>>>>> directly usable by the yet. It needs to store the actual type
>>>>>>>>>> in Kafka consumer config to retrieve at run time.>>>>>>>>>> Even 
>>>>>>>>>> without storing the class, it is still useful. It
>>>>>>>>>> simplifies user code:>>>>>>>>>> 
>>>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>>>> TypedKafkaAvroDeserializer<Envelope> {}>>>>>>>>>> 
>>>>>>>>>> This should be part of same package as KafkaAvroDeserializer
>>>>>>>>>> (surprised it is not there yet).>>>>>>>>>> 
>>>>>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson
>>>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>>>> Happy to hear
>>>>>>>>>>> 
>>>>>>>>>>> I wonder if we could do something like this (totally
>>>>>>>>>>> untested):>>>>>>>>>>> 
>>>>>>>>>>> public class TypedKafkaAvroDeserializer<T> extends
>>>>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> 
>>>>>>>>>>> {>>>>>>>>>>>    @Override
>>>>>>>>>>> 
>>>>>>>>>>>     public T deserialize(String s, byte[] bytes) {
>>>>>>>>>>>         return (T) this.deserialize(bytes);
>>>>>>>>>>>     }
>>>>>>>>>>> 
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+beam@andrew-
>>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>> __
>>>>>>>>>>>> Thanks Tim, that works!
>>>>>>>>>>>> 
>>>>>>>>>>>> Full code is:
>>>>>>>>>>>> 
>>>>>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>>>>>> AbstractKafkaAvroDeserializer implements
>>>>>>>>>>>> Deserializer<Envelope> {>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void configure(Map<String, ?> configs, boolean
>>>>>>>>>>>>     isKey) {>>>>>>>>>>>>         configure(new
>>>>>>>>>>>>         KafkaAvroDeserializerConfig(configs));>>>>>>>>>>>>     }
>>>>>>>>>>>> 
>>>>>>>>>>>>     @Override
>>>>>>>>>>>> 
>>>>>>>>>>>>     public Envelope deserialize(String s, byte[] bytes) {
>>>>>>>>>>>>         return (Envelope) this.deserialize(bytes);
>>>>>>>>>>>>     }
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void close() {}
>>>>>>>>>>>> }
>>>>>>>>>>>> 
>>>>>>>>>>>> Nicer than my solution so think that is the one I'm going
>>>>>>>>>>>> to go with for now.>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Andrew
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I also saw the same behaviour.  
>>>>>>>>>>>>> 
>>>>>>>>>>>>> It's not pretty but perhaps try this? It was my last idea
>>>>>>>>>>>>> I ran out of time to try...>>>>>>>>>>>>> *// Basically a copy 
>>>>>>>>>>>>> KafkaAvroDeserializer with the casts
>>>>>>>>>>>>> in deserialize
**public class *EnvelopeAvroDeserializer *extends
*AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>   ...
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes)
>>>>>>>>>>>>>   {
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes);
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>   }
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes,
>>>>>>>>>>>>>   Schema readerSchema) {
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes,
>>>>>>>>>>>>>     readerSchema);
>>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>   }
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>   ...
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> }
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Tim
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones 
>>>>>>>>>>>>> <andrew+beam@andrew-
>>>>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>>>> __
>>>>>>>>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable
>>>>>>>>>>>>>> to automatically infer a Coder' error at runtime.>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> This is the code:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> p.apply(KafkaIO.<String, Object>read()
>>>>>>>>>>>>>>                 .withValueDeserializer(KafkaAvroDeserial-
>>>>>>>>>>>>>>                 izer.class)>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> It compiles, but at runtime:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Caused by: java.lang.RuntimeException: Unable to
>>>>>>>>>>>>>> automatically infer a Coder for the Kafka Deserializer
>>>>>>>>>>>>>> class
>>>>>>>>>>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no
>>>>>>>>>>>>>> coder registered for type class java.lang.Object>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(Kafka-
>>>>>>>>>>>>>> IO.java:1696)>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> So far the only thing I've got working is this, where I
>>>>>>>>>>>>>> use the ByteArrayDeserializer and then parse Avro 
>>>>>>>>>>>>>> myself:>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>     private static KafkaAvroDecoder avroDecoder;
>>>>>>>>>>>>>>     static {
>>>>>>>>>>>>>>         final Properties props = new Properties();
>>>>>>>>>>>>>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFI-
>>>>>>>>>>>>>>         G, "kafka:9092");>>>>>>>>>>>>>>         
>>>>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_RE-
>>>>>>>>>>>>>>         GISTRY_URL_CONFIG, 
>>>>>>>>>>>>>> "http://registry:8081";);>>>>>>>>>>>>>>         
>>>>>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_A-
>>>>>>>>>>>>>>         VRO_READER_CONFIG, true);>>>>>>>>>>>>>>         
>>>>>>>>>>>>>> VerifiableProperties vProps = new
>>>>>>>>>>>>>>         VerifiableProperties(props);>>>>>>>>>>>>>>         
>>>>>>>>>>>>>> avroDecoder = new KafkaAvroDecoder(vProps);
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>         PipelineOptions options =
>>>>>>>>>>>>>>         PipelineOptionsFactory.create();>>>>>>>>>>>>>>         
>>>>>>>>>>>>>> Pipeline p = Pipeline.create(options);
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>>>>>>>>>>>>>                 .withBootstrapServers("kafka:9092")
>>>>>>>>>>>>>>                 .withTopic("dbserver1.inventory.customer-
>>>>>>>>>>>>>>                 s")>>>>>>>>>>>>>>                 
>>>>>>>>>>>>>> .withKeyDeserializer(ByteArrayDeserializ-
>>>>>>>>>>>>>>                 er.class)>>>>>>>>>>>>>>                 
>>>>>>>>>>>>>> .withValueDeserializer(ByteArrayDeserial-
>>>>>>>>>>>>>>                 izer.class)>>>>>>>>>>>>>>                 
>>>>>>>>>>>>>> .withoutMetadata(
>>>>>>>>>>>>>>         )
>>>>>>>>>>>>>>                 .apply(Values.<byte[]>create())
>>>>>>>>>>>>>>                 .apply("ParseAvro", ParDo.of(new
>>>>>>>>>>>>>>                 DoFn<byte[], Envelope>() {>>>>>>>>>>>>>>         
>>>>>>>>>>>>>>             @ProcessElement
>>>>>>>>>>>>>>                     public void
>>>>>>>>>>>>>>                     processElement(ProcessContext c) 
>>>>>>>>>>>>>> {>>>>>>>>>>>>>>                         Envelope data = 
>>>>>>>>>>>>>> (Envelope) avroD-
>>>>>>>>>>>>>>                         
>>>>>>>>>>>>>> ecoder.fromBytes(c.element());>>>>>>>>>>>>>>                     
>>>>>>>>>>>>>>     c.output(data);
>>>>>>>>>>>>>>                     }
>>>>>>>>>>>>>>                 }))
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov
>>>>>>>>>>>>>>> <kirpic...@google.com> wrote:>>>>>>>>>>>>>>>> It seems that 
>>>>>>>>>>>>>>> KafkaAvroDeserializer implements
>>>>>>>>>>>>>>>> Deserializer<Object>, though I suppose with proper
>>>>>>>>>>>>>>>> configuration that Object will at run-time be your
>>>>>>>>>>>>>>>> desired type. Have you tried adding some Java type
>>>>>>>>>>>>>>>> casts to make it compile?>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> +1, cast might be the simplest fix. Alternately you can
>>>>>>>>>>>>>>> wrap or extend KafkaAvroDeserializer as Tim suggested.
>>>>>>>>>>>>>>> It would cast the Object returned by
>>>>>>>>>>>>>>> KafkaAvroDeserializer::deserializer() to Envolope at
>>>>>>>>>>>>>>> runtime.>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson
>>>>>>>>>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>>>>>>>>>> I just 
>>>>>>>>>>>>>>>> tried quickly and see the same as you Andrew.>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> We're missing something obvious or else extending
>>>>>>>>>>>>>>>>> KafkaAvroDeserializer seems necessary right?>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones 
>>>>>>>>>>>>>>>>> <andrew+beam@andrew-
>>>>>>>>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I'm trying to read Avro data from a Kafka stream
>>>>>>>>>>>>>>>>>> using KafkaIO. I think>>>>>>>>>>>>>>>>>> it should be as 
>>>>>>>>>>>>>>>>>> simple as:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>>>>>>>>>>>>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializ-
>>>>>>>>>>>>>>>>>>   er.class,>>>>>>>>>>>>>>>>>>   AvroCoder.of(Envelope.class))
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Where Envelope is the name of the Avro class.
>>>>>>>>>>>>>>>>>> However, that does not>>>>>>>>>>>>>>>>>> compile and I get 
>>>>>>>>>>>>>>>>>> the following error:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> incompatible types:
>>>>>>>>>>>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.Kafka-
>>>>>>>>>>>>>>>>>> AvroDeserializer>>>>>>>>>>>>>>>>>>> cannot be converted to 
>>>>>>>>>>>>>>>>>> java.lang.Class<? extends
>>>>>>>>>>>>>>>>>> org.apache.kafka.common.serialization.Deserializer<d-
>>>>>>>>>>>>>>>>>> bserver1.inventory.customers.Envelope>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> I've tried a number of variations on this theme but
>>>>>>>>>>>>>>>>>> haven't yet worked>>>>>>>>>>>>>>>>>> it out and am starting 
>>>>>>>>>>>>>>>>>> to run out of ideas...
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> The code I'm using can be found at
>>>>>>>>>>>>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example
>>>>>>>>>>>>>>>>>> and a full>>>>>>>>>>>>>>>>>> environment can be created with 
>>>>>>>>>>>>>>>>>> Docker.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>> 


Links:

  1. mailto:andrew%2bb...@andrew-jones.com

Reply via email to