Hi Eugene,

I understood that was where Andrew started and reported this.  I tried and
saw the same as him.

incompatible types:
java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
cannot be converted to
org.apache.kafka.common.serialization.Deserializer<org.gbif.pipelines.io.avro.Envelope>

similarly with
(Class<? extends Deserializer<Envelope>>) KafkaAvroDeserializer.class



On Thu, Oct 19, 2017 at 9:00 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> I don't think extending the class is necessary. Not sure I understand why
> a simple type casting for withDeserializerAndCoder doesn't work? Have you
> tried this?
>
> p.apply(KafkaIO.<String, Envelope>read()
>   .withValueDeserializerAndCoder((Deserializer<Envelope>)
> KafkaAvroDeserializer.class,
>   AvroCoder.of(Envelope.class))
>
> On Thu, Oct 19, 2017 at 11:45 AM Tim Robertson <timrobertson...@gmail.com>
> wrote:
>
>> Hi Raghu
>>
>> I tried that but with KafkaAvroDeserializer already implementing
>> Deserializer<Object> I couldn't get it to work... I didn't spend too
>> much time though and agree something like that would be cleaner.
>>
>> Cheers,
>> Tim
>>
>> On Thu, Oct 19, 2017 at 7:54 PM, Raghu Angadi <rang...@google.com> wrote:
>>
>>> Thanks Tim.
>>>
>>> How about extending KafkaAvroDeserializer rather than
>>> AbstractKafkaAvroDeserializer?
>>>
>>> TypedKafkaAvroDeserializer class below is useful, but not directly
>>> usable by the yet. It needs to store the actual type in Kafka consumer
>>> config to retrieve at run time.
>>> Even without storing the class, it is still useful. It simplifies user
>>> code:
>>>
>>> public class EnvelopeKafkaAvroDeserializer extends
>>> TypedKafkaAvroDeserializer<Envelope> {}
>>>
>>> This should be part of same package as KafkaAvroDeserializer (surprised
>>> it is not there yet).
>>>
>>> On Thu, Oct 19, 2017 at 3:07 AM, Tim Robertson <
>>> timrobertson...@gmail.com> wrote:
>>>
>>>> Happy to hear
>>>>
>>>> I wonder if we could do something like this (totally untested):
>>>>
>>>> public class TypedKafkaAvroDeserializer<T> extends
>>>> AbstractKafkaAvroDeserializer implements Deserializer<T> {
>>>>    @Override
>>>>     public T deserialize(String s, byte[] bytes) {
>>>>         return (T) this.deserialize(bytes);
>>>>     }
>>>> }
>>>>
>>>> On Thu, Oct 19, 2017 at 12:03 PM, Andrew Jones <
>>>> andrew+b...@andrew-jones.com> wrote:
>>>>
>>>>> Thanks Tim, that works!
>>>>>
>>>>> Full code is:
>>>>>
>>>>> public class EnvelopeKafkaAvroDeserializer extends
>>>>> AbstractKafkaAvroDeserializer implements Deserializer<Envelope> {
>>>>>     @Override
>>>>>     public void configure(Map<String, ?> configs, boolean isKey) {
>>>>>         configure(new KafkaAvroDeserializerConfig(configs));
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public Envelope deserialize(String s, byte[] bytes) {
>>>>>         return (Envelope) this.deserialize(bytes);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void close() {}
>>>>> }
>>>>>
>>>>> Nicer than my solution so think that is the one I'm going to go with
>>>>> for now.
>>>>>
>>>>> Thanks,
>>>>> Andrew
>>>>>
>>>>>
>>>>> On Thu, Oct 19, 2017, at 10:20 AM, Tim Robertson wrote:
>>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> I also saw the same behaviour.
>>>>>
>>>>> It's not pretty but perhaps try this? It was my last idea I ran out of
>>>>> time to try...
>>>>>
>>>>>
>>>>> *// Basically a copy KafkaAvroDeserializer with the casts in 
>>>>> deserialize**public class *EnvelopeAvroDeserializer *extends 
>>>>> *AbstractKafkaAvroDeserializer *implements *Deserializer<Envelope> {
>>>>>
>>>>>   ...
>>>>>
>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes) {
>>>>>
>>>>>     *return *(Envelope) *this*.deserialize(bytes);
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>>   *public *Envelope deserialize(String s, *byte*[] bytes, Schema 
>>>>> readerSchema) {
>>>>>
>>>>>     *return *(Envelope) *this*.deserialize(bytes, readerSchema);
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>>
>>>>>   ...
>>>>>
>>>>> }
>>>>>
>>>>>  Tim
>>>>>
>>>>>
>>>>> On Thu, Oct 19, 2017 at 10:52 AM, Andrew Jones <
>>>>> andrew+b...@andrew-jones.com> wrote:
>>>>>
>>>>>
>>>>> Using Object doesn't work unfortunately. I get an 'Unable to
>>>>> automatically infer a Coder' error at runtime.
>>>>>
>>>>> This is the code:
>>>>>
>>>>> p.apply(KafkaIO.<String, Object>read()
>>>>>                 .withValueDeserializer(KafkaAvroDeserializer.class)
>>>>>
>>>>> It compiles, but at runtime:
>>>>>
>>>>> Caused by: java.lang.RuntimeException: Unable to automatically infer a
>>>>> Coder for the Kafka Deserializer class 
>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer:
>>>>> no coder registered for type class java.lang.Object
>>>>> at org.apache.beam.sdk.io.kafka.KafkaIO.inferCoder(KafkaIO.java:1696)
>>>>>
>>>>> So far the only thing I've got working is this, where I use the
>>>>> ByteArrayDeserializer and then parse Avro myself:
>>>>>
>>>>>     private static KafkaAvroDecoder avroDecoder;
>>>>>     static {
>>>>>         final Properties props = new Properties();
>>>>>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>>>> "kafka:9092");
>>>>>         props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>> "http://registry:8081";);
>>>>>         props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>> true);
>>>>>         VerifiableProperties vProps = new VerifiableProperties(props);
>>>>>         avroDecoder = new KafkaAvroDecoder(vProps);
>>>>>     }
>>>>>
>>>>>     public static void main(String[] args) {
>>>>>
>>>>>         PipelineOptions options = PipelineOptionsFactory.create();
>>>>>         Pipeline p = Pipeline.create(options);
>>>>>
>>>>>         p.apply(KafkaIO.<byte[], byte[]>read()
>>>>>                 .withBootstrapServers("kafka:9092")
>>>>>                 .withTopic("dbserver1.inventory.customers")
>>>>>                 .withKeyDeserializer(ByteArrayDeserializer.class)
>>>>>                 .withValueDeserializer(ByteArrayDeserializer.class)
>>>>>                 .withoutMetadata(
>>>>>         )
>>>>>                 .apply(Values.<byte[]>create())
>>>>>                 .apply("ParseAvro", ParDo.of(new DoFn<byte[],
>>>>> Envelope>() {
>>>>>                     @ProcessElement
>>>>>                     public void processElement(ProcessContext c) {
>>>>>                         Envelope data = (Envelope)
>>>>> avroDecoder.fromBytes(c.element());
>>>>>                         c.output(data);
>>>>>                     }
>>>>>                 }))
>>>>>
>>>>> Thanks,
>>>>> Andrew
>>>>>
>>>>> On Wed, Oct 18, 2017, at 06:40 PM, Raghu Angadi wrote:
>>>>>
>>>>> On Wed, Oct 18, 2017 at 10:35 AM, Eugene Kirpichov <
>>>>> kirpic...@google.com> wrote:
>>>>>
>>>>> It seems that KafkaAvroDeserializer implements Deserializer<Object>,
>>>>> though I suppose with proper configuration that Object will at run-time be
>>>>> your desired type. Have you tried adding some Java type casts to make it
>>>>> compile?
>>>>>
>>>>>
>>>>> +1, cast might be the simplest fix. Alternately you can wrap or
>>>>> extend KafkaAvroDeserializer as Tim suggested. It would cast the Object
>>>>> returned by KafkaAvroDeserializer::deserializer() to Envolope at
>>>>> runtime.
>>>>>
>>>>>
>>>>> On Wed, Oct 18, 2017 at 7:26 AM Tim Robertson <
>>>>> timrobertson...@gmail.com> wrote:
>>>>>
>>>>> I just tried quickly and see the same as you Andrew.
>>>>> We're missing something obvious or else extending KafkaAvroDeserialize
>>>>> r seems necessary right?
>>>>>
>>>>> On Wed, Oct 18, 2017 at 3:14 PM, Andrew Jones <
>>>>> andrew+b...@andrew-jones.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to read Avro data from a Kafka stream using KafkaIO. I think
>>>>> it should be as simple as:
>>>>>
>>>>> p.apply(KafkaIO.<String, Envelope>*read*()
>>>>>   .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
>>>>>   AvroCoder.of(Envelope.class))
>>>>>
>>>>> Where Envelope is the name of the Avro class. However, that does not
>>>>> compile and I get the following error:
>>>>>
>>>>> incompatible types:
>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroDeserializer>
>>>>> cannot be converted to java.lang.Class<? extends
>>>>> org.apache.kafka.common.serialization.Deserializer<
>>>>> dbserver1.inventory.customers.Envelope>>
>>>>>
>>>>> I've tried a number of variations on this theme but haven't yet worked
>>>>> it out and am starting to run out of ideas...
>>>>>
>>>>> Has anyone successfully read Avro data from Kafka?
>>>>>
>>>>> The code I'm using can be found at
>>>>> https://github.com/andrewrjones/debezium-kafka-beam-example and a full
>>>>> environment can be created with Docker.
>>>>>
>>>>> Thanks,
>>>>> Andrew
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to