Thanks Eugene. That does compile, although the rest of the pipeline
doesn't seem happy.
The next line is:

.apply(Values.<Envelope>create())

But that now doesn't compile with the following error:

/usr/src/kafka/src/main/java/com/andrewjones/KafkaAvroConsumerExample.j-
ava:[54,17] cannot find symbol  symbol:   method 
apply(org.apache.beam.sdk.transforms.Values<dbserver-
  1.inventory.customers.Envelope>)  location: interface 
org.apache.beam.sdk.values.POutput

Don't really understand what's wrong here. It works fine when using the
EnvelopeKafkaAvroDeserializer as suggested by Tim.
Thanks,
Andrew


On Fri, Oct 20, 2017, at 06:57 AM, Tim Robertson wrote:
> Thanks Eugene 
> 
> On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi
> <rang...@google.com> wrote:>> Ah, nice. It works. 
>> 
>> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov
>> <kirpic...@google.com> wrote:>>> The following compiles fine:
>>> 
>>> 
>>>         p.apply(KafkaIO.<String, Envelope>read()
>>> 
>>>                 .withBootstrapServers("kafka:9092")
>>>                 .withTopic("dbserver1.inventory.customers")
>>> 
>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>                 .withValueDeserializerAndCoder((Class)KafkaAvroDese-
>>>                 rializer.class, AvroCoder.of(Envelope.class))>>> 
>>> 
>>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com>
>>> wrote:>>>> Same for me. It does not look like there is an annotation to
>>>> suppress the error.>>>> 
>>>> 
>>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson
>>>> <timrobertson...@gmail.com> wrote:>>>>> Hi Eugene,
>>>>> 
>>>>> I understood that was where Andrew started and reported this.  I
>>>>> tried and saw the same as him.>>>>> 
>>>>> incompatible types: java.lang.Class<io.confluent.kafka.serializer-
>>>>> s.KafkaAvroDeserializer> cannot be converted to org.apache.kafka.-
>>>>> common.serialization.Deserializer<org.gbif.pipelines.io.avro.Enve-
>>>>> lope>>>>>> 
>>>>> similarly with 
>>>>> (Class<? *extends *Deserializer<Envelope>>)
>>>>> KafkaAvroDeserializer.*class*>>>>> 
>>>>> 
>>>>> 
>>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov
>>>>> <kirpic...@google.com> wrote:>>>>>> I don't think extending the class is 
>>>>> necessary. Not sure I
>>>>>> understand why a simple type casting for withDeserializerAndCoder
>>>>>> doesn't work? Have you tried this?>>>>>> 
>>>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>>>   .withValueDeserializerAndCoder((Deserializer<Envelope>)KafkaAv-
>>>>>>   roDeserializer.class,>>>>>>   AvroCoder.of(Envelope.class))
>>>>>> 
>>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson
>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>> Hi Raghu
>>>>>>> 
>>>>>>> I tried that but with KafkaAvroDeserializer already implementing
>>>>>>> Deserializer<Object> I couldn't get it to work... I didn't spend
>>>>>>> too much time though and agree something like that would be
>>>>>>> cleaner.>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Tim 
>>>>>>> 
>>>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi
>>>>>>> <rang...@google.com> wrote:>>>>>>>> Thanks Tim. 
>>>>>>>> 
>>>>>>>> How about extending KafkaAvroDeserializer rather than
>>>>>>>> AbstractKafkaAvroDeserializer?>>>>>>>> 
>>>>>>>> TypedKafkaAvroDeserializer class below is useful, but not
>>>>>>>> directly usable by the yet. It needs to store the actual type
>>>>>>>> in Kafka consumer config to retrieve at run time.>>>>>>>> Even without 
>>>>>>>> storing the class, it is still useful. It
>>>>>>>> simplifies user code:>>>>>>>> 
>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>> TypedKafkaAvroDeserializer<Envelope> {}>>>>>>>> 
>>>>>>>> This should be part of same package as KafkaAvroDeserializer
>>>>>>>> (surprised it is not there yet).>>>>>>>> 
>>>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson
>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>> Happy to hear
>>>>>>>>> 
>>>>>>>>> I wonder if we could do something like this (totally
>>>>>>>>> untested):>>>>>>>>> 
>>>>>>>>> public class TypedKafkaAvroDeserializer<T> extends
>>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> {>>>>>>>>>   
>>>>>>>>>  @Override
>>>>>>>>> 
>>>>>>>>>     public T deserialize(String s, byte[] bytes) {
>>>>>>>>>         return (T) this.deserialize(bytes);
>>>>>>>>>     }
>>>>>>>>> 
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+beam@andrew-
>>>>>>>>> jones.com> wrote:>>>>>>>>>> __
>>>>>>>>>> Thanks Tim, that works!
>>>>>>>>>> 
>>>>>>>>>> Full code is:
>>>>>>>>>> 
>>>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>>>> AbstractKafkaAvroDeserializer implements
>>>>>>>>>> Deserializer<Envelope> {>>>>>>>>>>     @Override
>>>>>>>>>>     public void configure(Map<String, ?> configs, boolean
>>>>>>>>>>     isKey) {>>>>>>>>>>         configure(new 
>>>>>>>>>> KafkaAvroDeserializerConfig(configs));>>>>>>>>>>     }
>>>>>>>>>> 
>>>>>>>>>>     @Override
>>>>>>>>>> 
>>>>>>>>>>     public Envelope deserialize(String s, byte[] bytes) {
>>>>>>>>>>         return (Envelope) this.deserialize(bytes);
>>>>>>>>>>     }
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>     @Override
>>>>>>>>>>     public void close() {}
>>>>>>>>>> }
>>>>>>>>>> 
>>>>>>>>>> Nicer than my solution so think that is the one I'm going to
>>>>>>>>>> go with for now.>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> Andrew
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>>>>>>>> Hi Andrew,
>>>>>>>>>>> 
>>>>>>>>>>> I also saw the same behaviour.  
>>>>>>>>>>> 
>>>>>>>>>>> It's not pretty but perhaps try this? It was my last idea I
>>>>>>>>>>> ran out of time to try...>>>>>>>>>>> *// Basically a copy 
>>>>>>>>>>> KafkaAvroDeserializer with the casts in
>>>>>>>>>>> deserialize
**public class *EnvelopeAvroDeserializer *extends
*AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>   ...
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes);
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>   }
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes,
>>>>>>>>>>>   Schema readerSchema) {
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes,
>>>>>>>>>>>     readerSchema);
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>   }
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>   ...
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Tim
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <andrew+beam@andrew-
>>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>> __
>>>>>>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable
>>>>>>>>>>>> to automatically infer a Coder' error at runtime.>>>>>>>>>>>> 
>>>>>>>>>>>> This is the code:
>>>>>>>>>>>> 
>>>>>>>>>>>> p.apply(KafkaIO.<String, Object>read()
>>>>>>>>>>>>                 .withValueDeserializer(KafkaAvroDeserializ-
>>>>>>>>>>>>                 er.class)>>>>>>>>>>>> 
>>>>>>>>>>>> It compiles, but at runtime:
>>>>>>>>>>>> 
>>>>>>>>>>>> Caused by: java.lang.RuntimeException: Unable to
>>>>>>>>>>>> automatically infer a Coder for the Kafka Deserializer
>>>>>>>>>>>> class io.confluent.kafka.serializers.KafkaAvroDeserializer:
>>>>>>>>>>>> no coder registered for type class java.lang.Object>>>>>>>>>>>> at 
>>>>>>>>>>>> org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO-
>>>>>>>>>>>> .java:1696)>>>>>>>>>>>> 
>>>>>>>>>>>> So far the only thing I've got working is this, where I use
>>>>>>>>>>>> the ByteArrayDeserializer and then parse Avro myself:>>>>>>>>>>>> 
>>>>>>>>>>>>     private static KafkaAvroDecoder avroDecoder;
>>>>>>>>>>>>     static {
>>>>>>>>>>>>         final Properties props = new Properties();
>>>>>>>>>>>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>>>>>>         "kafka:9092");>>>>>>>>>>>>         
>>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGI-
>>>>>>>>>>>>         STRY_URL_CONFIG, "http://registry:8081";);>>>>>>>>>>>>      
>>>>>>>>>>>>    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVR-
>>>>>>>>>>>>         O_READER_CONFIG, true);>>>>>>>>>>>>         
>>>>>>>>>>>> VerifiableProperties vProps = new
>>>>>>>>>>>>         VerifiableProperties(props);>>>>>>>>>>>>         
>>>>>>>>>>>> avroDecoder = new KafkaAvroDecoder(vProps);
>>>>>>>>>>>>     }
>>>>>>>>>>>> 
>>>>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>>>> 
>>>>>>>>>>>>         PipelineOptions options =
>>>>>>>>>>>>         PipelineOptionsFactory.create();>>>>>>>>>>>>         
>>>>>>>>>>>> Pipeline p = Pipeline.create(options);
>>>>>>>>>>>> 
>>>>>>>>>>>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>>>>>>>>>>>                 .withBootstrapServers("kafka:9092")
>>>>>>>>>>>>                 
>>>>>>>>>>>> .withTopic("dbserver1.inventory.customers")>>>>>>>>>>>>            
>>>>>>>>>>>>      .withKeyDeserializer(ByteArrayDeserializer-
>>>>>>>>>>>>                 .class)>>>>>>>>>>>>                 
>>>>>>>>>>>> .withValueDeserializer(ByteArrayDeserializ-
>>>>>>>>>>>>                 er.class)>>>>>>>>>>>>                 
>>>>>>>>>>>> .withoutMetadata(
>>>>>>>>>>>>         )
>>>>>>>>>>>>                 .apply(Values.<byte[]>create())
>>>>>>>>>>>>                 .apply("ParseAvro", ParDo.of(new
>>>>>>>>>>>>                 DoFn<byte[], Envelope>() {>>>>>>>>>>>>             
>>>>>>>>>>>>         @ProcessElement
>>>>>>>>>>>>                     public void
>>>>>>>>>>>>                     processElement(ProcessContext c) {>>>>>>>>>>>> 
>>>>>>>>>>>>                         Envelope data = (Envelope) avroDec-
>>>>>>>>>>>>                         oder.fromBytes(c.element());>>>>>>>>>>>>   
>>>>>>>>>>>>                       c.output(data);
>>>>>>>>>>>>                     }
>>>>>>>>>>>>                 }))
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Andrew
>>>>>>>>>>>> 
>>>>>>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov
>>>>>>>>>>>>> <kirpic...@google.com> wrote:>>>>>>>>>>>>>> It seems that 
>>>>>>>>>>>>> KafkaAvroDeserializer implements
>>>>>>>>>>>>>> Deserializer<Object>, though I suppose with proper
>>>>>>>>>>>>>> configuration that Object will at run-time be your
>>>>>>>>>>>>>> desired type. Have you tried adding some Java type casts
>>>>>>>>>>>>>> to make it compile?>>>>>>>>>>>>> 
>>>>>>>>>>>>> +1, cast might be the simplest fix. Alternately you can
>>>>>>>>>>>>> wrap or extend KafkaAvroDeserializer as Tim suggested. It
>>>>>>>>>>>>> would cast the Object returned by
>>>>>>>>>>>>> KafkaAvroDeserializer::deserializer() to Envolope at
>>>>>>>>>>>>> runtime.>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson
>>>>>>>>>>>>>> <timrobertson...@gmail.com> wrote:>>>>>>>>>>>>>>> I just tried 
>>>>>>>>>>>>>> quickly and see the same as you Andrew.  
>>>>>>>>>>>>>>> We're missing something obvious or else extending
>>>>>>>>>>>>>>> KafkaAvroDeserializer seems necessary right?>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones 
>>>>>>>>>>>>>>> <andrew+beam@andrew-
>>>>>>>>>>>>>>> jones.com> wrote:>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I'm trying to read Avro data from a Kafka stream using
>>>>>>>>>>>>>>>> KafkaIO. I think>>>>>>>>>>>>>>>> it should be as simple as:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>>>>>>>>>>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer-
>>>>>>>>>>>>>>>>   .class,>>>>>>>>>>>>>>>>   AvroCoder.of(Envelope.class))
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Where Envelope is the name of the Avro class. However,
>>>>>>>>>>>>>>>> that does not>>>>>>>>>>>>>>>> compile and I get the following 
>>>>>>>>>>>>>>>> error:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> incompatible types:
>>>>>>>>>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAv-
>>>>>>>>>>>>>>>> roDeserializer>>>>>>>>>>>>>>>>> cannot be converted to 
>>>>>>>>>>>>>>>> java.lang.Class<? extends
>>>>>>>>>>>>>>>> org.apache.kafka.common.serialization.Deserializer<dbs-
>>>>>>>>>>>>>>>> erver1.inventory.customers.Envelope>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I've tried a number of variations on this theme but
>>>>>>>>>>>>>>>> haven't yet worked>>>>>>>>>>>>>>>> it out and am starting to 
>>>>>>>>>>>>>>>> run out of ideas...
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> The code I'm using can be found at
>>>>>>>>>>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example
>>>>>>>>>>>>>>>> and a full>>>>>>>>>>>>>>>> environment can be created with 
>>>>>>>>>>>>>>>> Docker.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Andrew
>>>>>>>>>>>> 
>>>>>>>>>> 

Reply via email to