Happy to hear

I wonder if we could do something like this (totally untested):

public class TypedKafkaAvroDeserializer<T> extends
AbstractKafkaAvroDeserializer implements Deserializer<T> {
   @Override
    public T deserialize(String s, byte[] bytes) {
        return (T) this.deserialize(bytes);
    }
}

On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <andrew+b...@andrew-jones.com
> wrote:

> Thanks Tim, that works!
>
> Full code is:
>
> public class EnvelopeKafkaAvroDeserializer extends
> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>     @Override
>     public void configure(Map<String, ?> configs, boolean isKey) {
>         configure(new KafkaAvroDeserializerConfig(configs));
>     }
>
>     @Override
>     public Envelope deserialize(String s, byte[] bytes) {
>         return (Envelope) this.deserialize(bytes);
>     }
>
>     @Override
>     public void close() {}
> }
>
> Nicer than my solution so think that is the one I'm going to go with for
> now.
>
> Thanks,
> Andrew
>
>
> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>
> Hi Andrew,
>
> I also saw the same behaviour.
>
> It's not pretty but perhaps try this? It was my last idea I ran out of
> time to try...
>
>
> *// Basically a copy KafkaAvroDeserializer with the casts in 
> deserialize**public class *EnvelopeAvroDeserializer *extends 
> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>
>   ...
>
>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>
>     *return *(Envelope) *this*.deserialize(bytes);
>
>   }
>
>
>
>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
> readerSchema) {
>
>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>
>   }
>
>
>
>   ...
>
> }
>
>  Tim
>
>
> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
>
> Using Object doesn't work unfortunately. I get an 'Unable to automatically
> infer a Coder' error at runtime.
>
> This is the code:
>
> p.apply(KafkaIO.<String, Object>read()
>                 .withValueDeserializer(KafkaAvroDeserializer.class)
>
> It compiles, but at runtime:
>
> Caused by: java.lang.RuntimeException: Unable to automatically infer a
> Coder for the Kafka Deserializer class 
> io.confluent.kafka.serializers.KafkaAvroDeserializer:
> no coder registered for type class java.lang.Object
> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>
> So far the only thing I've got working is this, where I use the
> ByteArrayDeserializer and then parse Avro myself:
>
>     private static KafkaAvroDecoder avroDecoder;
>     static {
>         final Properties props = new Properties();
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
>         props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> "http://registry:8081";);
>         props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
> true);
>         VerifiableProperties vProps = new VerifiableProperties(props);
>         avroDecoder = new KafkaAvroDecoder(vProps);
>     }
>
>     public static void main(String[] args) {
>
>         PipelineOptions options = PipelineOptionsFactory.create();
>         Pipeline p = Pipeline.create(options);
>
>         p.apply(KafkaIO.<byte[], byte[]>read()
>                 .withBootstrapServers("kafka:9092")
>                 .withTopic("dbserver1.inventory.customers")
>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>                 .withValueDeserializer(ByteArrayDeserializer.class)
>                 .withoutMetadata(
>         )
>                 .apply(Values.<byte[]>create())
>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[], Envelope>() {
>                     @ProcessElement
>                     public void processElement(ProcessContext c) {
>                         Envelope data = (Envelope)
> avroDecoder.fromBytes(c.element());
>                         c.output(data);
>                     }
>                 }))
>
> Thanks,
> Andrew
>
> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>
> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
> It seems that KafkaAvroDeserializer implements Deserializer<Object>,
> though I suppose with proper configuration that Object will at run-time be
> your desired type. Have you tried adding some Java type casts to make it
> compile?
>
>
> +1, cast might be the simplest fix. Alternately you can wrap or
> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
> returned by KafkaAvroDeserializer::deserializer() to Envolope at runtime.
>
>
> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <timrobertson...@gmail.com>
> wrote:
>
> I just tried quickly and see the same as you Andrew.
> We're missing something obvious or else extending KafkaAvroDeserializer seems
> necessary right?
>
> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
> andrew+b...@andrew-jones.com> wrote:
>
> Hi,
>
> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
> it should be as simple as:
>
> p.apply(KafkaIO.<String, Envelope>*read*()
>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> Where Envelope is the name of the Avro class. However, that does not
> compile and I get the following error:
>
> incompatible types:
> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
> cannot be converted to java.lang.Class<? extends
> org.apache.kafka.common.serialization.Deserializer<dbserver1
> .inventory.customers.Envelope>>
>
> I've tried a number of variations on this theme but haven't yet worked
> it out and am starting to run out of ideas...
>
> Has anyone successfully read Avro data from Kafka?
>
> The code I'm using can be found at
> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
> environment can be created with Docker.
>
> Thanks,
> Andrew
>
>
>
>

Reply via email to