I don't think extending the class is necessary. Not sure I understand why a
simple type casting for withDeserializerAndCoder doesn't work? Have you
tried this?

p.apply(KafkaIO.<String, Envelope>read()
  .withValueDeserializerAndCoder
((Deserializer<Envelope>)KafkaAvroDeserializer.class,
  AvroCoder.of(Envelope.class))

On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <timrobertson...@gmail.com>
wrote:

> Hi Raghu
>
> I tried that but with KafkaAvroDeserializer already implementing
> Deserializer<Object> I couldn't get it to work... I didn't spend too much
> time though and agree something like that would be cleaner.
>
> Cheers,
> Tim
>
> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> wrote:
>
>> Thanks Tim.
>>
>> How about extending KafkaAvroDeserializer rather
>> than AbstractKafkaAvroDeserializer?
>>
>> TypedKafkaAvroDeserializer class below is useful, but not directly usable
>> by the yet. It needs to store the actual type in Kafka consumer config to
>> retrieve at run time.
>> Even without storing the class, it is still useful. It simplifies user
>> code:
>>
>> public class EnvelopeKafkaAvroDeserializer extends
>> TypedKafkaAvroDeserializer<Envelope> {}
>>
>> This should be part of same package as KafkaAvroDeserializer (surprised
>> it is not there yet).
>>
>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <timrobertson...@gmail.com
>> > wrote:
>>
>>> Happy to hear
>>>
>>> I wonder if we could do something like this (totally untested):
>>>
>>> public class TypedKafkaAvroDeserializer<T> extends
>>> AbstractKafkaAvroDeserializer implements Deserializer<T> {
>>>    @Override
>>>     public T deserialize(String s, byte[] bytes) {
>>>         return (T) this.deserialize(bytes);
>>>     }
>>> }
>>>
>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>>> andrew+b...@andrew-jones.com> wrote:
>>>
>>>> Thanks Tim, that works!
>>>>
>>>> Full code is:
>>>>
>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>>>>     @Override
>>>>     public void configure(Map<String, ?> configs, boolean isKey) {
>>>>         configure(new KafkaAvroDeserializerConfig(configs));
>>>>     }
>>>>
>>>>     @Override
>>>>     public Envelope deserialize(String s, byte[] bytes) {
>>>>         return (Envelope) this.deserialize(bytes);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void close() {}
>>>> }
>>>>
>>>> Nicer than my solution so think that is the one I'm going to go with
>>>> for now.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>
>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>
>>>> Hi Andrew,
>>>>
>>>> I also saw the same behaviour.
>>>>
>>>> It's not pretty but perhaps try this? It was my last idea I ran out of
>>>> time to try...
>>>>
>>>>
>>>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>>>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>>
>>>>   ...
>>>>
>>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>>
>>>>     *return *(Envelope) *this*.deserialize(bytes);
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>>>> readerSchema) {
>>>>
>>>>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>>   ...
>>>>
>>>> }
>>>>
>>>>  Tim
>>>>
>>>>
>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>>>> andrew+b...@andrew-jones.com> wrote:
>>>>
>>>>
>>>> Using Object doesn't work unfortunately. I get an 'Unable to
>>>> automatically infer a Coder' error at runtime.
>>>>
>>>> This is the code:
>>>>
>>>> p.apply(KafkaIO.<String, Object>read()
>>>>                 .withValueDeserializer(KafkaAvroDeserializer.class)
>>>>
>>>> It compiles, but at runtime:
>>>>
>>>> Caused by: java.lang.RuntimeException: Unable to automatically infer a
>>>> Coder for the Kafka Deserializer class
>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer: no coder registered
>>>> for type class java.lang.Object
>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>>>>
>>>> So far the only thing I've got working is this, where I use the
>>>> ByteArrayDeserializer and then parse Avro myself:
>>>>
>>>>     private static KafkaAvroDecoder avroDecoder;
>>>>     static {
>>>>         final Properties props = new Properties();
>>>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>> "kafka:9092");
>>>>
>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "
>>>> http://registry:8081";);
>>>>
>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
>>>>         VerifiableProperties vProps = new VerifiableProperties(props);
>>>>         avroDecoder = new KafkaAvroDecoder(vProps);
>>>>     }
>>>>
>>>>     public static void main(String[] args) {
>>>>
>>>>         PipelineOptions options = PipelineOptionsFactory.create();
>>>>         Pipeline p = Pipeline.create(options);
>>>>
>>>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>>>                 .withBootstrapServers("kafka:9092")
>>>>                 .withTopic("dbserver1.inventory.customers")
>>>>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>>>>                 .withValueDeserializer(ByteArrayDeserializer.class)
>>>>                 .withoutMetadata(
>>>>         )
>>>>                 .apply(Values.<byte[]>create())
>>>>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>>>> Envelope>() {
>>>>                     @ProcessElement
>>>>                     public void processElement(ProcessContext c) {
>>>>                         Envelope data = (Envelope)
>>>> avroDecoder.fromBytes(c.element());
>>>>                         c.output(data);
>>>>                     }
>>>>                 }))
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>>
>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <
>>>> kirpic...@google.com> wrote:
>>>>
>>>> It seems that KafkaAvroDeserializer implements Deserializer<Object>,
>>>> though I suppose with proper configuration that Object will at run-time be
>>>> your desired type. Have you tried adding some Java type casts to make it
>>>> compile?
>>>>
>>>>
>>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
>>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime.
>>>>
>>>>
>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <
>>>> timrobertson...@gmail.com> wrote:
>>>>
>>>> I just tried quickly and see the same as you Andrew.
>>>> We're missing something obvious or else extending
>>>> KafkaAvroDeserializer seems necessary right?
>>>>
>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
>>>> andrew+b...@andrew-jones.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
>>>> it should be as simple as:
>>>>
>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>>>   AvroCoder.of(Envelope.class))
>>>>
>>>> Where Envelope is the name of the Avro class. However, that does not
>>>> compile and I get the following error:
>>>>
>>>> incompatible types:
>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
>>>> cannot be converted to java.lang.Class<? extends
>>>>
>>>> org.apache.kafka.common.serialization.Deserializer<dbserver1.inventory.customers.Envelope>>
>>>>
>>>> I've tried a number of variations on this theme but haven't yet worked
>>>> it out and am starting to run out of ideas...
>>>>
>>>> Has anyone successfully read Avro data from Kafka?
>>>>
>>>> The code I'm using can be found at
>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
>>>> environment can be created with Docker.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to