Hi Raghu

I tried that but with KafkaAvroDeserializer already implementing
Deserializer<Object> I couldn't get it to work... I didn't spend too much
time though and agree something like that would be cleaner.

Cheers,
Tim

On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> wrote:

> Thanks Tim.
>
> How about extending KafkaAvroDeserializer rather than
> AbstractKafkaAvroDeserializer?
>
> TypedKafkaAvroDeserializer class below is useful, but not directly usable
> by the yet. It needs to store the actual type in Kafka consumer config to
> retrieve at run time.
> Even without storing the class, it is still useful. It simplifies user
> code:
>
> public class EnvelopeKafkaAvroDeserializer extends
> TypedKafkaAvroDeserializer<Envelope> {}
>
> This should be part of same package as KafkaAvroDeserializer (surprised it
> is not there yet).
>
> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <timrobertson...@gmail.com>
> wrote:
>
>> Happy to hear
>>
>> I wonder if we could do something like this (totally untested):
>>
>> public class TypedKafkaAvroDeserializer<T> extends
>> AbstractKafkaAvroDeserializer implements Deserializer<T> {
>>    @Override
>>     public T deserialize(String s, byte[] bytes) {
>>         return (T) this.deserialize(bytes);
>>     }
>> }
>>
>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>> andrew+b...@andrew-jones.com> wrote:
>>
>>> Thanks Tim, that works!
>>>
>>> Full code is:
>>>
>>> public class EnvelopeKafkaAvroDeserializer extends
>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>>>     @Override
>>>     public void configure(Map<String, ?> configs, boolean isKey) {
>>>         configure(new KafkaAvroDeserializerConfig(configs));
>>>     }
>>>
>>>     @Override
>>>     public Envelope deserialize(String s, byte[] bytes) {
>>>         return (Envelope) this.deserialize(bytes);
>>>     }
>>>
>>>     @Override
>>>     public void close() {}
>>> }
>>>
>>> Nicer than my solution so think that is the one I'm going to go with for
>>> now.
>>>
>>> Thanks,
>>> Andrew
>>>
>>>
>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>
>>> Hi Andrew,
>>>
>>> I also saw the same behaviour.
>>>
>>> It's not pretty but perhaps try this? It was my last idea I ran out of
>>> time to try...
>>>
>>>
>>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>
>>>   ...
>>>
>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>
>>>     *return *(Envelope) *this*.deserialize(bytes);
>>>
>>>   }
>>>
>>>
>>>
>>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>>> readerSchema) {
>>>
>>>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>>
>>>   }
>>>
>>>
>>>
>>>   ...
>>>
>>> }
>>>
>>>  Tim
>>>
>>>
>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>>> andrew+b...@andrew-jones.com> wrote:
>>>
>>>
>>> Using Object doesn't work unfortunately. I get an 'Unable to
>>> automatically infer a Coder' error at runtime.
>>>
>>> This is the code:
>>>
>>> p.apply(KafkaIO.<String, Object>read()
>>>                 .withValueDeserializer(KafkaAvroDeserializer.class)
>>>
>>> It compiles, but at runtime:
>>>
>>> Caused by: java.lang.RuntimeException: Unable to automatically infer a
>>> Coder for the Kafka Deserializer class 
>>> io.confluent.kafka.serializers.KafkaAvroDeserializer:
>>> no coder registered for type class java.lang.Object
>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>>>
>>> So far the only thing I've got working is this, where I use the
>>> ByteArrayDeserializer and then parse Avro myself:
>>>
>>>     private static KafkaAvroDecoder avroDecoder;
>>>     static {
>>>         final Properties props = new Properties();
>>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>> "kafka:9092");
>>>         props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>> "http://registry:8081";);
>>>         props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>> true);
>>>         VerifiableProperties vProps = new VerifiableProperties(props);
>>>         avroDecoder = new KafkaAvroDecoder(vProps);
>>>     }
>>>
>>>     public static void main(String[] args) {
>>>
>>>         PipelineOptions options = PipelineOptionsFactory.create();
>>>         Pipeline p = Pipeline.create(options);
>>>
>>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>>                 .withBootstrapServers("kafka:9092")
>>>                 .withTopic("dbserver1.inventory.customers")
>>>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>>>                 .withValueDeserializer(ByteArrayDeserializer.class)
>>>                 .withoutMetadata(
>>>         )
>>>                 .apply(Values.<byte[]>create())
>>>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>>> Envelope>() {
>>>                     @ProcessElement
>>>                     public void processElement(ProcessContext c) {
>>>                         Envelope data = (Envelope)
>>> avroDecoder.fromBytes(c.element());
>>>                         c.output(data);
>>>                     }
>>>                 }))
>>>
>>> Thanks,
>>> Andrew
>>>
>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>
>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpic...@google.com
>>> > wrote:
>>>
>>> It seems that KafkaAvroDeserializer implements Deserializer<Object>,
>>> though I suppose with proper configuration that Object will at run-time be
>>> your desired type. Have you tried adding some Java type casts to make it
>>> compile?
>>>
>>>
>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at
>>> runtime.
>>>
>>>
>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <timrobertson...@gmail.com>
>>> wrote:
>>>
>>> I just tried quickly and see the same as you Andrew.
>>> We're missing something obvious or else extending KafkaAvroDeserializer 
>>> seems
>>> necessary right?
>>>
>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
>>> andrew+b...@andrew-jones.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
>>> it should be as simple as:
>>>
>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>>   AvroCoder.of(Envelope.class))
>>>
>>> Where Envelope is the name of the Avro class. However, that does not
>>> compile and I get the following error:
>>>
>>> incompatible types:
>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
>>> cannot be converted to java.lang.Class<? extends
>>> org.apache.kafka.common.serialization.Deserializer<dbserver1
>>> .inventory.customers.Envelope>>
>>>
>>> I've tried a number of variations on this theme but haven't yet worked
>>> it out and am starting to run out of ideas...
>>>
>>> Has anyone successfully read Avro data from Kafka?
>>>
>>> The code I'm using can be found at
>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
>>> environment can be created with Docker.
>>>
>>> Thanks,
>>> Andrew
>>>
>>>
>>>
>>>
>>
>

Reply via email to