Thanks Eugene

On Thu, Oct 19, 2017 at 9:36 PM, Raghu Angadi <rang...@google.com> wrote:

> Ah, nice. It works.
>
> On Thu, Oct 19, 2017 at 1:44 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> The following compiles fine:
>>
>>
>>         p.apply(KafkaIO.<String, Envelope>read()
>>                 .withBootstrapServers("kafka:9092")
>>                 .withTopic("dbserver1.inventory.customers")
>>                 .withKeyDeserializer(StringDeserializer.class)
>>                 .withValueDeserializerAndCoder
>> ((Class)KafkaAvroDeserializer.class, AvroCoder.of(Envelope.class))
>>
>>
>> On Thu, Oct 19, 2017 at 12:21 PM Raghu Angadi <rang...@google.com> wrote:
>>
>>> Same for me. It does not look like there is an annotation to suppress
>>> the error.
>>>
>>>
>>> On Thu, Oct 19, 2017 at 12:18 PM, Tim Robertson <
>>> timrobertson...@gmail.com> wrote:
>>>
>>>> Hi Eugene,
>>>>
>>>> I understood that was where Andrew started and reported this.  I tried
>>>> and saw the same as him.
>>>>
>>>> incompatible types: java.lang.Class<io.confluent.k
>>>> afka.serializers.KafkaAvroDeserializer> cannot be converted to
>>>> org.apache.kafka.common.serialization.Deserializer<org.gbif.
>>>> pipelines.io.avro.Envelope>
>>>>
>>>> similarly with
>>>> (Class<? extends Deserializer<Envelope>>) KafkaAvroDeserializer.class
>>>>
>>>>
>>>>
>>>> On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com
>>>> > wrote:
>>>>
>>>>> I don't think extending the class is necessary. Not sure I understand
>>>>> why a simple type casting for withDeserializerAndCoder doesn't work? Have
>>>>> you tried this?
>>>>>
>>>>> p.apply(KafkaIO.<String, Envelope>read()
>>>>>   .withValueDeserializerAndCoder((Deserializer<Envelope>)Kafka
>>>>> AvroDeserializer.class,
>>>>>   AvroCoder.of(Envelope.class))
>>>>>
>>>>> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <
>>>>> timrobertson...@gmail.com> wrote:
>>>>>
>>>>>> Hi Raghu
>>>>>>
>>>>>> I tried that but with KafkaAvroDeserializer already implementing
>>>>>> Deserializer<Object> I couldn't get it to work... I didn't spend too
>>>>>> much time though and agree something like that would be cleaner.
>>>>>>
>>>>>> Cheers,
>>>>>> Tim
>>>>>>
>>>>>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks Tim.
>>>>>>>
>>>>>>> How about extending KafkaAvroDeserializer rather
>>>>>>> than AbstractKafkaAvroDeserializer?
>>>>>>>
>>>>>>> TypedKafkaAvroDeserializer class below is useful, but not directly
>>>>>>> usable by the yet. It needs to store the actual type in Kafka consumer
>>>>>>> config to retrieve at run time.
>>>>>>> Even without storing the class, it is still useful. It simplifies
>>>>>>> user code:
>>>>>>>
>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>> TypedKafkaAvroDeserializer<Envelope> {}
>>>>>>>
>>>>>>> This should be part of same package as KafkaAvroDeserializer
>>>>>>> (surprised it is not there yet).
>>>>>>>
>>>>>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>>>>>>> timrobertson...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Happy to hear
>>>>>>>>
>>>>>>>> I wonder if we could do something like this (totally untested):
>>>>>>>>
>>>>>>>> public class TypedKafkaAvroDeserializer<T> extends
>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> {
>>>>>>>>    @Override
>>>>>>>>     public T deserialize(String s, byte[] bytes) {
>>>>>>>>         return (T) this.deserialize(bytes);
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>>>>>>>> andrew+b...@andrew-jones.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks Tim, that works!
>>>>>>>>>
>>>>>>>>> Full code is:
>>>>>>>>>
>>>>>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>>>>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>>>>>>>>>     @Override
>>>>>>>>>     public void configure(Map<String, ?> configs, boolean isKey) {
>>>>>>>>>         configure(new KafkaAvroDeserializerConfig(configs));
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public Envelope deserialize(String s, byte[] bytes) {
>>>>>>>>>         return (Envelope) this.deserialize(bytes);
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void close() {}
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Nicer than my solution so think that is the one I'm going to go
>>>>>>>>> with for now.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Andrew
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>>>>>>
>>>>>>>>> Hi Andrew,
>>>>>>>>>
>>>>>>>>> I also saw the same behaviour.
>>>>>>>>>
>>>>>>>>> It's not pretty but perhaps try this? It was my last idea I ran
>>>>>>>>> out of time to try...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>>>>>>>>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>>>>>>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>>>>>>>
>>>>>>>>>   ...
>>>>>>>>>
>>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>>>>>>>
>>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes);
>>>>>>>>>
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>>>>>>>>> readerSchema) {
>>>>>>>>>
>>>>>>>>>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>>>>>>>>
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>   ...
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>  Tim
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>>>>>>>>> andrew+b...@andrew-jones.com> wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Using Object doesn't work unfortunately. I get an 'Unable to
>>>>>>>>> automatically infer a Coder' error at runtime.
>>>>>>>>>
>>>>>>>>> This is the code:
>>>>>>>>>
>>>>>>>>> p.apply(KafkaIO.<String, Object>read()
>>>>>>>>>                 .withValueDeserializer(KafkaAv
>>>>>>>>> roDeserializer.class)
>>>>>>>>>
>>>>>>>>> It compiles, but at runtime:
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.RuntimeException: Unable to automatically
>>>>>>>>> infer a Coder for the Kafka Deserializer class
>>>>>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder
>>>>>>>>> registered for type class java.lang.Object
>>>>>>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java
>>>>>>>>> :1696)
>>>>>>>>>
>>>>>>>>> So far the only thing I've got working is this, where I use the
>>>>>>>>> ByteArrayDeserializer and then parse Avro myself:
>>>>>>>>>
>>>>>>>>>     private static KafkaAvroDecoder avroDecoder;
>>>>>>>>>     static {
>>>>>>>>>         final Properties props = new Properties();
>>>>>>>>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>>>>>> "kafka:9092");
>>>>>>>>>         props.put(AbstractKafkaAvroSer
>>>>>>>>> DeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://registry:8081";);
>>>>>>>>>         props.put(KafkaAvroDeserialize
>>>>>>>>> rConfig.SPECIFIC_AVRO_READER_CONFIG, true);
>>>>>>>>>         VerifiableProperties vProps = new
>>>>>>>>> VerifiableProperties(props);
>>>>>>>>>         avroDecoder = new KafkaAvroDecoder(vProps);
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     public static void main(String[] args) {
>>>>>>>>>
>>>>>>>>>         PipelineOptions options = PipelineOptionsFactory.create();
>>>>>>>>>         Pipeline p = Pipeline.create(options);
>>>>>>>>>
>>>>>>>>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>>>>>>>>                 .withBootstrapServers("kafka:9092")
>>>>>>>>>                 .withTopic("dbserver1.inventory.customers")
>>>>>>>>>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>>>>>>>>>                 .withValueDeserializer(ByteArr
>>>>>>>>> ayDeserializer.class)
>>>>>>>>>                 .withoutMetadata(
>>>>>>>>>         )
>>>>>>>>>                 .apply(Values.<byte[]>create())
>>>>>>>>>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>>>>>>>>> Envelope>() {
>>>>>>>>>                     @ProcessElement
>>>>>>>>>                     public void processElement(ProcessContext c) {
>>>>>>>>>                         Envelope data = (Envelope)
>>>>>>>>> avroDecoder.fromBytes(c.element());
>>>>>>>>>                         c.output(data);
>>>>>>>>>                     }
>>>>>>>>>                 }))
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Andrew
>>>>>>>>>
>>>>>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>>>>>>>
>>>>>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <
>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>
>>>>>>>>> It seems that KafkaAvroDeserializer implements
>>>>>>>>> Deserializer<Object>, though I suppose with proper configuration that
>>>>>>>>> Object will at run-time be your desired type. Have you tried adding 
>>>>>>>>> some
>>>>>>>>> Java type casts to make it compile?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>>>>>>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the 
>>>>>>>>> Object
>>>>>>>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at
>>>>>>>>> runtime.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <
>>>>>>>>> timrobertson...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> I just tried quickly and see the same as you Andrew.
>>>>>>>>> We're missing something obvious or else extending
>>>>>>>>> KafkaAvroDeserializer seems necessary right?
>>>>>>>>>
>>>>>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
>>>>>>>>> andrew+b...@andrew-jones.com> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I
>>>>>>>>> think
>>>>>>>>> it should be as simple as:
>>>>>>>>>
>>>>>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>>>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>>>>>>>>   AvroCoder.of(Envelope.class))
>>>>>>>>>
>>>>>>>>> Where Envelope is the name of the Avro class. However, that does
>>>>>>>>> not
>>>>>>>>> compile and I get the following error:
>>>>>>>>>
>>>>>>>>> incompatible types:
>>>>>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDese
>>>>>>>>> rializer>
>>>>>>>>> cannot be converted to java.lang.Class<? extends
>>>>>>>>> org.apache.kafka.common.serialization.Deserializer<dbserver1
>>>>>>>>> .inventory.customers.Envelope>>
>>>>>>>>>
>>>>>>>>> I've tried a number of variations on this theme but haven't yet
>>>>>>>>> worked
>>>>>>>>> it out and am starting to run out of ideas...
>>>>>>>>>
>>>>>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>>>>>
>>>>>>>>> The code I'm using can be found at
>>>>>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a
>>>>>>>>> full
>>>>>>>>> environment can be created with Docker.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Andrew
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>

Reply via email to