Hi Raghu I tried that but with KafkaAvroDeserializer already implementing Deserializer<Object> I couldn't get it to work... I didn't spend too much time though and agree something like that would be cleaner.
Cheers, Tim On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> wrote: > Thanks Tim. > > How about extending KafkaAvroDeserializer rather than > AbstractKafkaAvroDeserializer? > > TypedKafkaAvroDeserializer class below is useful, but not directly usable > by the yet. It needs to store the actual type in Kafka consumer config to > retrieve at run time. > Even without storing the class, it is still useful. It simplifies user > code: > > public class EnvelopeKafkaAvroDeserializer extends > TypedKafkaAvroDeserializer<Envelope> {} > > This should be part of same package as KafkaAvroDeserializer (surprised it > is not there yet). > > On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <timrobertson...@gmail.com> > wrote: > >> Happy to hear >> >> I wonder if we could do something like this (totally untested): >> >> public class TypedKafkaAvroDeserializer<T> extends >> AbstractKafkaAvroDeserializer implements Deserializer<T> { >> @Override >> public T deserialize(String s, byte[] bytes) { >> return (T) this.deserialize(bytes); >> } >> } >> >> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones < >> andrew+b...@andrew-jones.com> wrote: >> >>> Thanks Tim, that works! >>> >>> Full code is: >>> >>> public class EnvelopeKafkaAvroDeserializer extends >>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> { >>> @Override >>> public void configure(Map<String, ?> configs, boolean isKey) { >>> configure(new KafkaAvroDeserializerConfig(configs)); >>> } >>> >>> @Override >>> public Envelope deserialize(String s, byte[] bytes) { >>> return (Envelope) this.deserialize(bytes); >>> } >>> >>> @Override >>> public void close() {} >>> } >>> >>> Nicer than my solution so think that is the one I'm going to go with for >>> now. >>> >>> Thanks, >>> Andrew >>> >>> >>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote: >>> >>> Hi Andrew, >>> >>> I also saw the same behaviour. >>> >>> It's not pretty but perhaps try this? It was my last idea I ran out of >>> time to try... >>> >>> >>> *// Basically a copy KafkaAvroDeserializer with the casts in >>> deserialize**public class *EnvelopeAvroDeserializer *extends >>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> { >>> >>> ... >>> >>> *public *Envelope deserialize(String s, *byte*[] bytes) { >>> >>> *return *(Envelope) *this*.deserialize(bytes); >>> >>> } >>> >>> >>> >>> *public *Envelope deserialize(String s, *byte*[] bytes, Schema >>> readerSchema) { >>> >>> *return *(Envelope) *this*.deserialize(bytes, readerSchema); >>> >>> } >>> >>> >>> >>> ... >>> >>> } >>> >>> Tim >>> >>> >>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones < >>> andrew+b...@andrew-jones.com> wrote: >>> >>> >>> Using Object doesn't work unfortunately. I get an 'Unable to >>> automatically infer a Coder' error at runtime. >>> >>> This is the code: >>> >>> p.apply(KafkaIO.<String, Object>read() >>> .withValueDeserializer(KafkaAvroDeserializer.class) >>> >>> It compiles, but at runtime: >>> >>> Caused by: java.lang.RuntimeException: Unable to automatically infer a >>> Coder for the Kafka Deserializer class >>> io.confluent.kafka.serializers.KafkaAvroDeserializer: >>> no coder registered for type class java.lang.Object >>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696) >>> >>> So far the only thing I've got working is this, where I use the >>> ByteArrayDeserializer and then parse Avro myself: >>> >>> private static KafkaAvroDecoder avroDecoder; >>> static { >>> final Properties props = new Properties(); >>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>> "kafka:9092"); >>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>> "http://registry:8081"); >>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>> true); >>> VerifiableProperties vProps = new VerifiableProperties(props); >>> avroDecoder = new KafkaAvroDecoder(vProps); >>> } >>> >>> public static void main(String[] args) { >>> >>> PipelineOptions options = PipelineOptionsFactory.create(); >>> Pipeline p = Pipeline.create(options); >>> >>> p.apply(KafkaIO.<byte[], byte[]>read() >>> .withBootstrapServers("kafka:9092") >>> .withTopic("dbserver1.inventory.customers") >>> .withKeyDeserializer(ByteArrayDeserializer.class) >>> .withValueDeserializer(ByteArrayDeserializer.class) >>> .withoutMetadata( >>> ) >>> .apply(Values.<byte[]>create()) >>> .apply("ParseAvro", ParDo.of(new DoFn<byte[], >>> Envelope>() { >>> @ProcessElement >>> public void processElement(ProcessContext c) { >>> Envelope data = (Envelope) >>> avroDecoder.fromBytes(c.element()); >>> c.output(data); >>> } >>> })) >>> >>> Thanks, >>> Andrew >>> >>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote: >>> >>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <kirpic...@google.com >>> > wrote: >>> >>> It seems that KafkaAvroDeserializer implements Deserializer<Object>, >>> though I suppose with proper configuration that Object will at run-time be >>> your desired type. Have you tried adding some Java type casts to make it >>> compile? >>> >>> >>> +1, cast might be the simplest fix. Alternately you can wrap or >>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object >>> returned by KafkaAvroDeserializer::deserializer() to Envolope at >>> runtime. >>> >>> >>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <timrobertson...@gmail.com> >>> wrote: >>> >>> I just tried quickly and see the same as you Andrew. >>> We're missing something obvious or else extending KafkaAvroDeserializer >>> seems >>> necessary right? >>> >>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones < >>> andrew+b...@andrew-jones.com> wrote: >>> >>> Hi, >>> >>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think >>> it should be as simple as: >>> >>> p.apply(KafkaIO.<String, Envelope>*read*() >>> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class, >>> AvroCoder.of(Envelope.class)) >>> >>> Where Envelope is the name of the Avro class. However, that does not >>> compile and I get the following error: >>> >>> incompatible types: >>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer> >>> cannot be converted to java.lang.Class<? extends >>> org.apache.kafka.common.serialization.Deserializer<dbserver1 >>> .inventory.customers.Envelope>> >>> >>> I've tried a number of variations on this theme but haven't yet worked >>> it out and am starting to run out of ideas... >>> >>> Has anyone successfully read Avro data from Kafka? >>> >>> The code I'm using can be found at >>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full >>> environment can be created with Docker. >>> >>> Thanks, >>> Andrew >>> >>> >>> >>> >> >