When specific.avro.reader is set to true Deserializer tries to create the 
instance of the Class. The class name is formed by reading the schema (writer 
schema) from schema registry and concatenating the namespace and record name. 
It is trying to create that instance and it is not found in the class path. But 
I am not sure how it formed the name XYZ-Table (Check the namespace and name of 
the record in the schema registry and making the class available in the class 
path should solve it )This is my understanding. I may be wrong!!

Nagendra

> On May 5, 2020, at 11:12 AM, Suresh Chidambaram <chida.sur...@gmail.com> 
> wrote:
> 
> Hi All,
> 
> Currently, I'm working on a usecase wherein I have to deserialie an Avro
> object and convert to some other format of Avro. Below is the  flow.
> 
> DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro
> as nested object).
> 
> When I deserialize the message from the Source Topic, the below exception
> is thrown.
> 
> Could someone help me resolving this issue?
> 
> 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> o.a.k.clients.consumer.KafkaConsumer     : [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> groupId=null] Unsubscribed all topics or patterns and assigned partitions
> 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> o.a.k.clients.consumer.KafkaConsumer     : [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> groupId=null] Unsubscribed all topics or patterns and assigned partitions
> 2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [confluent-kafka-poc-client-StreamThread-1] State transition from
> PARTITIONS_ASSIGNED to RUNNING
> 2020-05-05 10:29:34.219  INFO 13804 --- [-StreamThread-1]
> org.apache.kafka.streams.KafkaStreams    : stream-client
> [confluent-kafka-poc-client] State transition from REBALANCING to RUNNING
> 2020-05-05 10:29:34.220  INFO 13804 --- [-StreamThread-1]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> groupId=confluent-kafka-poc] Found no committed offset for partition
> DEMO-poc-0
> 2020-05-05 10:29:34.228  INFO 13804 --- [-StreamThread-1]
> o.a.k.c.c.internals.SubscriptionState    : [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
> groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to
> offset 0.
> 2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1]
> o.a.k.s.e.LogAndFailExceptionHandler     : Exception caught during
> Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0
> 
> org.apache.kafka.common.errors.SerializationException: Error deserializing
> Avro message for id 1421
> 
> *Caused by: org.apache.kafka.common.errors.SerializationException: Could
> not find class "XYZ-Table" specified in writer's schema whilst finding
> reader's schema for a SpecificRecord.*
> 2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [confluent-kafka-poc-client-StreamThread-1] Encountered the following
> unexpected Kafka exception during processing, this usually indicate Streams
> internal errors:
> 
> org.apache.kafka.streams.errors.StreamsException: Deserialization exception
> handler is set to fail upon a deserialization error. If you would rather
> have the streaming pipeline continue after a deserialization error, please
> set the default.deserialization.exception.handler appropriately.
>        at
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>        at
> org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>        at
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>        at
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>        at
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> ~[kafka-streams-5.3.0-ccs.jar!/:na]
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id 1421
> Caused by: org.apache.kafka.common.errors.SerializationException: Could not
> find class "XYZ-Table" specified in writer's schema whilst finding reader's
> schema for a SpecificRecord.
> 
> 2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [confluent-kafka-poc-client-StreamThread-1] State transition from RUNNING
> to PENDING_SHUTDOWN
> 2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [confluent-kafka-poc-client-StreamThread-1] Shutting down
> 2020-05-05 10:30:12.891  INFO 13804 --- [-StreamThread-1]
> o.a.k.clients.consumer.KafkaConsumer     : [Consumer
> clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
> groupId=null] Unsubscribed all topics or patterns and assigned partitions
> 2020-05-05 10:30:12.891  INFO 13804 --- [-StreamThread-1]
> o.a.k.clients.producer.KafkaProducer     : [Producer
> clientId=confluent-kafka-poc-client-StreamThread-1-producer] Closing the
> Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [confluent-kafka-poc-client-StreamThread-1] State transition from
> PENDING_SHUTDOWN to DEAD
> 2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
> org.apache.kafka.streams.KafkaStreams    : stream-client
> [confluent-kafka-poc-client] State transition from RUNNING to ERROR
> 2020-05-05 10:30:12.895 ERROR 13804 --- [-StreamThread-1]
> org.apache.kafka.streams.KafkaStreams    : stream-client
> [confluent-kafka-poc-client] All stream threads have died. The instance
> will be in error state and should be closed.
> 2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread         : stream-thread
> [confluent-kafka-poc-client-StreamThread-1] Shutdown complete
> Exception in thread "confluent-kafka-poc-client-StreamThread-1"
> org.apache.kafka.streams.errors.StreamsException: Deserialization exception
> handler is set to fail upon a deserialization error. If you would rather
> have the streaming pipeline continue after a deserialization error, please
> set the default.deserialization.exception.handler appropriately.
>        at
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
>        at
> org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158)
>        at
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100)
>        at
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
>        at
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
>        at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> Caused by: org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id 1421
> Caused by: org.apache.kafka.common.errors.SerializationException: Could not
> find class "XYZ-Table"  specified in writer's schema whilst finding
> reader's schema for a SpecificRecord.
> 
> Thanks
> C Suresh

Reply via email to