Hi All,

Currently, I'm working on a usecase wherein I have to deserialie an Avro
object and convert to some other format of Avro. Below is the  flow.

DB -> Source Topic(Avro format) -> Stream Processor -> Target Topic (Avro
as nested object).

When I deserialize the message from the Source Topic, the below exception
is thrown.

Could someone help me resolving this issue?

2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer     : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer     : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-05-05 10:29:34.218  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[confluent-kafka-poc-client-StreamThread-1] State transition from
PARTITIONS_ASSIGNED to RUNNING
2020-05-05 10:29:34.219  INFO 13804 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams    : stream-client
[confluent-kafka-poc-client] State transition from REBALANCING to RUNNING
2020-05-05 10:29:34.220  INFO 13804 --- [-StreamThread-1]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
groupId=confluent-kafka-poc] Found no committed offset for partition
DEMO-poc-0
2020-05-05 10:29:34.228  INFO 13804 --- [-StreamThread-1]
o.a.k.c.c.internals.SubscriptionState    : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-consumer,
groupId=confluent-kafka-poc] Resetting offset for partition DEMO-poc-0 to
offset 0.
2020-05-05 10:30:12.886 ERROR 13804 --- [-StreamThread-1]
o.a.k.s.e.LogAndFailExceptionHandler     : Exception caught during
Deserialization, taskId: 0_0, topic: DEMO-poc, partition: 0, offset: 0

org.apache.kafka.common.errors.SerializationException: Error deserializing
Avro message for id 1421

*Caused by: org.apache.kafka.common.errors.SerializationException: Could
not find class "XYZ-Table" specified in writer's schema whilst finding
reader's schema for a SpecificRecord.*
2020-05-05 10:30:12.888 ERROR 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[confluent-kafka-poc-client-StreamThread-1] Encountered the following
unexpected Kafka exception during processing, this usually indicate Streams
internal errors:

org.apache.kafka.streams.errors.StreamsException: Deserialization exception
handler is set to fail upon a deserialization error. If you would rather
have the streaming pipeline continue after a deserialization error, please
set the default.deserialization.exception.handler appropriately.
        at
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
~[kafka-streams-5.3.0-ccs.jar!/:na]
        at
org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158)
~[kafka-streams-5.3.0-ccs.jar!/:na]
        at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100)
~[kafka-streams-5.3.0-ccs.jar!/:na]
        at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
~[kafka-streams-5.3.0-ccs.jar!/:na]
        at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746)
~[kafka-streams-5.3.0-ccs.jar!/:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
~[kafka-streams-5.3.0-ccs.jar!/:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
~[kafka-streams-5.3.0-ccs.jar!/:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
~[kafka-streams-5.3.0-ccs.jar!/:na]
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
~[kafka-streams-5.3.0-ccs.jar!/:na]
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 1421
Caused by: org.apache.kafka.common.errors.SerializationException: Could not
find class "XYZ-Table" specified in writer's schema whilst finding reader's
schema for a SpecificRecord.

2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[confluent-kafka-poc-client-StreamThread-1] State transition from RUNNING
to PENDING_SHUTDOWN
2020-05-05 10:30:12.888  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[confluent-kafka-poc-client-StreamThread-1] Shutting down
2020-05-05 10:30:12.891  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.consumer.KafkaConsumer     : [Consumer
clientId=confluent-kafka-poc-client-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
2020-05-05 10:30:12.891  INFO 13804 --- [-StreamThread-1]
o.a.k.clients.producer.KafkaProducer     : [Producer
clientId=confluent-kafka-poc-client-StreamThread-1-producer] Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[confluent-kafka-poc-client-StreamThread-1] State transition from
PENDING_SHUTDOWN to DEAD
2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams    : stream-client
[confluent-kafka-poc-client] State transition from RUNNING to ERROR
2020-05-05 10:30:12.895 ERROR 13804 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams    : stream-client
[confluent-kafka-poc-client] All stream threads have died. The instance
will be in error state and should be closed.
2020-05-05 10:30:12.895  INFO 13804 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[confluent-kafka-poc-client-StreamThread-1] Shutdown complete
Exception in thread "confluent-kafka-poc-client-StreamThread-1"
org.apache.kafka.streams.errors.StreamsException: Deserialization exception
handler is set to fail upon a deserialization error. If you would rather
have the streaming pipeline continue after a deserialization error, please
set the default.deserialization.exception.handler appropriately.
        at
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
        at
org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:158)
        at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:100)
        at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
        at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:746)
        at
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 1421
Caused by: org.apache.kafka.common.errors.SerializationException: Could not
find class "XYZ-Table"  specified in writer's schema whilst finding
reader's schema for a SpecificRecord.

Thanks
C Suresh

Reply via email to