Looks like your producer writing a Avro specfic records.

Can you read the records using bundled console consumer? I think it will be
simpler for you to get it returning valid records and use the same
deserializer config with your KafkaIO reader.

On Fri, Sep 28, 2018 at 9:33 AM Vishwas Bm <[email protected]> wrote:

> Hi Raghu,
>
> Thanks for the response.  We are now trying with GenericAvroDeserializer
> but still seeing issues.
> We have a producer which sends messages to kafka in format
> <String,GenericRecord>.
>
> Below is the code snippet, we have used at Beam KafkaIo.
>
>      org.apache.avro.Schema schema = null;
>         try {
>             schema = new org.apache.avro.Schema.Parser().parse(new
> File("Schema path"));
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>         KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String,
> GenericRecord>read()
>
> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
>                 .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
> AvroCoder.of(schema))
>
> .updateConsumerProperties(ImmutableMap.of("schema.registry.url", schemaUrl))
>                 .withTimestampPolicyFactory((tp, prevWatermark) -> new
> KafkaCustomTimestampPolicy(maxDelay,
>                         timestampInfo, prevWatermark));
>
> Below is the error seen,
>
> Caused by:
> avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.avro.AvroRuntimeException: Not a Specific class: interface
> org.apache.avro.generic.GenericRecord
>         at
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
>         at
> avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
>         at
> avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
>         at
> avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
>         at
> org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
>         ... 8 more
> Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class:
> interface org.apache.avro.generic.GenericRecord
>         at
> org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
>         at
> org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:594)
>         at
> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
>         at
> org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
>         at
> avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
>         at
> avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
>         at
> avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
>         at
> avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
>
>
> Can you provide some pointers on this.
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
>
> On Fri, Sep 28, 2018 at 3:12 AM Raghu Angadi <[email protected]> wrote:
>
>> It is a compilation error due to type mismatch for value type.
>>
>> Please match key and value types for KafkaIO reader. I.e. if you have
>> KafkaIO.<KeyType, ValueType>read().,  'withValueDeserializer()' needs a
>> class object which extends 'Deserializer<ValueType>'. Since
>> KafkaAvroDeserializer extends 'Deserializer<Object>', so your ValueType
>> needs to be Object, instead of String.
>>
>> Btw, it might be better to use GenericAvroDeseiralizer or
>> SpecificAvroDeserializer from the same package.
>>
>>
>> On Thu, Sep 27, 2018 at 10:31 AM Vishwas Bm <[email protected]> wrote:
>>
>>>
>>> Hi Raghu,
>>>
>>> The deserializer is provided by confluent
>>> *io.confluent.kafka.serializers* package.
>>>
>>> When we set valueDeserializer as  KafkaAvroDeserializer.  We are getting
>>> below error:
>>>    The method withValueDeserializer(Class<? extends
>>> Deserializer<String>>) in the type KafkaIO.Read<String,String> is not
>>> applicable for the arguments
>>>  (Class<KafkaAvroDeserializer>)
>>>
>>> From the error, it looks like beam does not support this deserializer.
>>> Also we wanted to use schemaRegistry from confluent, is this supported
>>> in Beam ?
>>>
>>>
>>> *Thanks & Regards,*
>>> *Vishwas *
>>>
>>>
>>> On Thu, Sep 27, 2018 at 10:28 PM Raghu Angadi <[email protected]>
>>> wrote:
>>>
>>>> You can set key/value deserializers :
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L101
>>>> What are the errors you see?
>>>>
>>>> Also note that Beam includes AvroCoder for handling Avro records in
>>>> Beam.
>>>>
>>>> On Thu, Sep 27, 2018 at 6:05 AM rahul patwari <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have a usecase to read data from Kafka serialized with
>>>>> KafkaAvroSerializer and schema is present in Schema Registry.
>>>>>
>>>>> When we are trying to use ValueDeserializer as
>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer to get GenericRecord,
>>>>> we are seeing errors.
>>>>>
>>>>> Does KafkaIO.read() supports reading from schema registry and using
>>>>> confluent KafkaAvroSerDe?
>>>>>
>>>>> Regards,
>>>>> Rahul
>>>>>
>>>>

Reply via email to