Hello All, I am trying to consume an AVRO formatted message using the confluent.schemaregistry.client based serializer. The code is in Python.
The message was posted to Kafka topic using the confluent AVRO encoder. But, when I try to consume the message using KafkaConsumer, I get the following error thrown by the iterator step “for msg in consumer" assert has_snappy(), 'Snappy decompression unsupported' AssertionError: Snappy decompression unsupported Question: 1. Why is the Kafka consumer not using the value_deserializer as specified in the code? Looks like it is interpreting the message directly endnote using the myserializer method. So, the first 4 bytes added by Confluent Serializer during encoding are seen as compression. 2. Any other configuration that I should do? Listen below is the code: from kafka import KafkaConsumer from confluent.schemaregistry.client import CachedSchemaRegistryClient from confluent.schemaregistry.serializers import MessageSerializer schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xx.xx.xx:8081') serializer = MessageSerializer(schema_registry_client) def myserializer(msg): decoded_msg = serializer.decode_message(msg) return decoded_msg # To consume messages consumer = KafkaConsumer(topic, bootstrap_servers=brokers, value_deserializer=myserializer) for msg in consumer: print "topic is " + msg.topic ~Muthu