Hello All,

I am trying to consume an AVRO formatted message using the 
confluent.schemaregistry.client based serializer. The code is in Python.

The message was posted to Kafka topic using the confluent AVRO encoder. But, 
when I try to consume the message using KafkaConsumer, I get the following 
error thrown by the iterator step “for msg in consumer"


assert has_snappy(), 'Snappy decompression unsupported'

AssertionError: Snappy decompression unsupported


Question:
1. Why is the Kafka consumer not using the value_deserializer as specified in 
the code? Looks like it is interpreting the message directly endnote using the 
myserializer method. So, the first 4 bytes added by Confluent Serializer during 
encoding are seen as compression.
2. Any other configuration that I should do? Listen below is the code:

from kafka import KafkaConsumer
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = 
CachedSchemaRegistryClient(url='http://xx.xx.xx.xx:8081')
serializer = MessageSerializer(schema_registry_client)

def myserializer(msg):
    decoded_msg = serializer.decode_message(msg)
    return decoded_msg

# To consume messages
consumer = KafkaConsumer(topic,
                         bootstrap_servers=brokers, 
value_deserializer=myserializer)

for msg in consumer:
    print "topic is " + msg.topic





~Muthu


Reply via email to