Thanks Eugene. That does compile, although the rest of the pipeline
doesn't seem happy.
The next line is:


But that now doesn't compile with the following error:

ava:[54,17] cannot find symbol  symbol:   method 
  1.inventory.customers.Envelope>)  location: interface 

Don't really understand what's wrong here. It works fine when using the
EnvelopeKafkaAvroDeserializer as suggested by Tim.

On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
> Thanks Eugene 
> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi
> <> wrote:>> Ah, nice. It works. 
>> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov
>> <> wrote:>>> The following compiles fine:
>>>         p.apply(KafkaIO.<String, Envelope>read()
>>>                 .withBootstrapServers("kafka:9092")
>>>                 .withTopic("dbserver1.inventory.customers")
>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>                 .withValueDeserializerAndCoder((Class)KafkaAvroDese-
>>>                 rializer.class, AvroCoder.of(Envelope.class))>>> 
>>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <>
>>> wrote:>>>> Same for me. It does not look like there is an annotation to
>>>> suppress the error.>>>> 
>>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson
>>>> <> wrote:>>>>> Hi Eugene,
>>>>> I understood that was where Andrew started and reported this.  I
>>>>> tried and saw the same as him.>>>>> 
>>>>> incompatible types: java.lang.Class<io.confluent.kafka.serializer-
>>>>> s.KafkaAvroDeserializer> cannot be converted to org.apache.kafka.-
>>>>> common.serialization.Deserializer<
>>>>> lope>>>>>> 
>>>>> similarly with 
>>>>> (Class<? *extends *Deserializer<Envelope>>)
>>>>> KafkaAvroDeserializer.*class*>>>>> 
>>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov
>>>>> <> wrote:>>>>>> I don't think extending the class is 
>>>>> necessary. Not sure I
>>>>>> understand why a simple type casting for withDeserializerAndCoder
>>>>>> doesn't work? Have you tried this?>>>>>> 
>>>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>>>   .withValueDeserializerAndCoder((Deserializer<Envelope>)KafkaAv-
>>>>>>   roDeserializer.class,>>>>>>   AvroCoder.of(Envelope.class))
>>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson
>>>>>> <> wrote:>>>>>>> Hi Raghu
>>>>>>> I tried that but with KafkaAvroDeserializer already implementing
>>>>>>> Deserializer<Object> I couldn't get it to work... I didn't spend
>>>>>>> too much time though and agree something like that would be
>>>>>>> cleaner.>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Tim 
>>>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi
>>>>>>> <> wrote:>>>>>>>> Thanks Tim. 
>>>>>>>> How about extending KafkaAvroDeserializer rather than
>>>>>>>> AbstractKafkaAvroDeserializer?>>>>>>>> 
>>>>>>>> TypedKafkaAvroDeserializer class below is useful, but not
>>>>>>>> directly usable by the yet. It needs to store the actual type
>>>>>>>> in Kafka consumer config to retrieve at run time.>>>>>>>> Even without 
>>>>>>>> storing the class, it is still useful. It
>>>>>>>> simplifies user code:>>>>>>>> 
>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>> TypedKafkaAvroDeserializer<Envelope> {}>>>>>>>> 
>>>>>>>> This should be part of same package as KafkaAvroDeserializer
>>>>>>>> (surprised it is not there yet).>>>>>>>> 
>>>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson
>>>>>>>> <> wrote:>>>>>>>>> Happy to hear
>>>>>>>>> I wonder if we could do something like this (totally
>>>>>>>>> untested):>>>>>>>>> 
>>>>>>>>> public class TypedKafkaAvroDeserializer<T> extends
>>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> {>>>>>>>>>   
>>>>>>>>>  @Override
>>>>>>>>>     public T deserialize(String s, byte[] bytes) {
>>>>>>>>>         return (T) this.deserialize(bytes);
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+beam@andrew-
>>>>>>>>>> wrote:>>>>>>>>>> __
>>>>>>>>>> Thanks Tim, that works!
>>>>>>>>>> Full code is:
>>>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>>>> AbstractKafkaAvroDeserializer implements
>>>>>>>>>> Deserializer<Envelope> {>>>>>>>>>>     @Override
>>>>>>>>>>     public void configure(Map<String, ?> configs, boolean
>>>>>>>>>>     isKey) {>>>>>>>>>>         configure(new 
>>>>>>>>>> KafkaAvroDeserializerConfig(configs));>>>>>>>>>>     }
>>>>>>>>>>     @Override
>>>>>>>>>>     public Envelope deserialize(String s, byte[] bytes) {
>>>>>>>>>>         return (Envelope) this.deserialize(bytes);
>>>>>>>>>>     }
>>>>>>>>>>     @Override
>>>>>>>>>>     public void close() {}
>>>>>>>>>> }
>>>>>>>>>> Nicer than my solution so think that is the one I'm going to
>>>>>>>>>> go with for now.>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>> I also saw the same behaviour.  
>>>>>>>>>>> It's not pretty but perhaps try this? It was my last idea I
>>>>>>>>>>> ran out of time to try...>>>>>>>>>>> *// Basically a copy 
>>>>>>>>>>> KafkaAvroDeserializer with the casts in
>>>>>>>>>>> deserialize
**public class *EnvelopeAvroDeserializer *extends
*AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>>>>>>>>>   ...
>>>>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes);
>>>>>>>>>>>   }
>>>>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes,
>>>>>>>>>>>   Schema readerSchema) {
>>>>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes,
>>>>>>>>>>>     readerSchema);
>>>>>>>>>>>   }
>>>>>>>>>>>   ...
>>>>>>>>>>> }
>>>>>>>>>>> Tim
>>>>>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <andrew+beam@andrew-
>>>>>>>>>>>> wrote:>>>>>>>>>>>> __
>>>>>>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable
>>>>>>>>>>>> to automatically infer a Coder' error at runtime.>>>>>>>>>>>> 
>>>>>>>>>>>> This is the code:
>>>>>>>>>>>> p.apply(KafkaIO.<String, Object>read()
>>>>>>>>>>>>                 .withValueDeserializer(KafkaAvroDeserializ-
>>>>>>>>>>>>                 er.class)>>>>>>>>>>>> 
>>>>>>>>>>>> It compiles, but at runtime:
>>>>>>>>>>>> Caused by: java.lang.RuntimeException: Unable to
>>>>>>>>>>>> automatically infer a Coder for the Kafka Deserializer
>>>>>>>>>>>> class io.confluent.kafka.serializers.KafkaAvroDeserializer:
>>>>>>>>>>>> no coder registered for type class java.lang.Object>>>>>>>>>>>> at 
>>>>>>>>>>>> .java:1696)>>>>>>>>>>>> 
>>>>>>>>>>>> So far the only thing I've got working is this, where I use
>>>>>>>>>>>> the ByteArrayDeserializer and then parse Avro myself:>>>>>>>>>>>> 
>>>>>>>>>>>>     private static KafkaAvroDecoder avroDecoder;
>>>>>>>>>>>>     static {
>>>>>>>>>>>>         final Properties props = new Properties();
>>>>>>>>>>>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>>>>>>         "kafka:9092");>>>>>>>>>>>>         
>>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGI-
>>>>>>>>>>>>         STRY_URL_CONFIG, "http://registry:8081";);>>>>>>>>>>>>      
>>>>>>>>>>>>    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVR-
>>>>>>>>>>>>         O_READER_CONFIG, true);>>>>>>>>>>>>         
>>>>>>>>>>>> VerifiableProperties vProps = new
>>>>>>>>>>>>         VerifiableProperties(props);>>>>>>>>>>>>         
>>>>>>>>>>>> avroDecoder = new KafkaAvroDecoder(vProps);
>>>>>>>>>>>>     }
>>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>>         PipelineOptions options =
>>>>>>>>>>>>         PipelineOptionsFactory.create();>>>>>>>>>>>>         
>>>>>>>>>>>> Pipeline p = Pipeline.create(options);
>>>>>>>>>>>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>>>>>>>>>>>                 .withBootstrapServers("kafka:9092")
>>>>>>>>>>>> .withTopic("dbserver1.inventory.customers")>>>>>>>>>>>>            
>>>>>>>>>>>>      .withKeyDeserializer(ByteArrayDeserializer-
>>>>>>>>>>>>                 .class)>>>>>>>>>>>>                 
>>>>>>>>>>>> .withValueDeserializer(ByteArrayDeserializ-
>>>>>>>>>>>>                 er.class)>>>>>>>>>>>>                 
>>>>>>>>>>>> .withoutMetadata(
>>>>>>>>>>>>         )
>>>>>>>>>>>>                 .apply(Values.<byte[]>create())
>>>>>>>>>>>>                 .apply("ParseAvro", ParDo.of(new
>>>>>>>>>>>>                 DoFn<byte[], Envelope>() {>>>>>>>>>>>>             
>>>>>>>>>>>>         @ProcessElement
>>>>>>>>>>>>                     public void
>>>>>>>>>>>>                     processElement(ProcessContext c) {>>>>>>>>>>>> 
>>>>>>>>>>>>                         Envelope data = (Envelope) avroDec-
>>>>>>>>>>>>                         oder.fromBytes(c.element());>>>>>>>>>>>>   
>>>>>>>>>>>>                       c.output(data);
>>>>>>>>>>>>                     }
>>>>>>>>>>>>                 }))
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Andrew
>>>>>>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov
>>>>>>>>>>>>> <> wrote:>>>>>>>>>>>>>> It seems that 
>>>>>>>>>>>>> KafkaAvroDeserializer implements
>>>>>>>>>>>>>> Deserializer<Object>, though I suppose with proper
>>>>>>>>>>>>>> configuration that Object will at run-time be your
>>>>>>>>>>>>>> desired type. Have you tried adding some Java type casts
>>>>>>>>>>>>>> to make it compile?>>>>>>>>>>>>> 
>>>>>>>>>>>>> +1, cast might be the simplest fix. Alternately you can
>>>>>>>>>>>>> wrap or extend KafkaAvroDeserializer as Tim suggested. It
>>>>>>>>>>>>> would cast the Object returned by
>>>>>>>>>>>>> KafkaAvroDeserializer::deserializer() to Envolope at
>>>>>>>>>>>>> runtime.>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson
>>>>>>>>>>>>>> <> wrote:>>>>>>>>>>>>>>> I just tried 
>>>>>>>>>>>>>> quickly and see the same as you Andrew.  
>>>>>>>>>>>>>>> We're missing something obvious or else extending
>>>>>>>>>>>>>>> KafkaAvroDeserializer seems necessary right?>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones 
>>>>>>>>>>>>>>> <andrew+beam@andrew-
>>>>>>>>>>>>>>>> wrote:>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> I'm trying to read Avro data from a Kafka stream using
>>>>>>>>>>>>>>>> KafkaIO. I think>>>>>>>>>>>>>>>> it should be as simple as:
>>>>>>>>>>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>>>>>>>>>>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer-
>>>>>>>>>>>>>>>>   .class,>>>>>>>>>>>>>>>>   AvroCoder.of(Envelope.class))
>>>>>>>>>>>>>>>> Where Envelope is the name of the Avro class. However,
>>>>>>>>>>>>>>>> that does not>>>>>>>>>>>>>>>> compile and I get the following 
>>>>>>>>>>>>>>>> error:
>>>>>>>>>>>>>>>> incompatible types:
>>>>>>>>>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAv-
>>>>>>>>>>>>>>>> roDeserializer>>>>>>>>>>>>>>>>> cannot be converted to 
>>>>>>>>>>>>>>>> java.lang.Class<? extends
>>>>>>>>>>>>>>>> org.apache.kafka.common.serialization.Deserializer<dbs-
>>>>>>>>>>>>>>>> erver1.inventory.customers.Envelope>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I've tried a number of variations on this theme but
>>>>>>>>>>>>>>>> haven't yet worked>>>>>>>>>>>>>>>> it out and am starting to 
>>>>>>>>>>>>>>>> run out of ideas...
>>>>>>>>>>>>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>>>>>>>>>>>> The code I'm using can be found at
>>>>>>>>>>>>>>>> and a full>>>>>>>>>>>>>>>> environment can be created with 
>>>>>>>>>>>>>>>> Docker.
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Andrew

Reply via email to