I'm working on adding gzip support to the Python client, and I'm running into 
some issues. I think I might not understand exactly how the compression is 
supposed to be implemented.

My initial approach was to set the compression byte to 1 to indicate gzip, and 
then simply gzip the payload. Here is an example request sent to Kafka (with 
byte-by-byte breakdown). I am sending the payload "test" to "my-topic" 
partition 0:

\x00\x00\x006\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00"\x00\x00\x00\x1e\x01\x011\xc6\xb4\x08\x1f\x8b\x08\x00\x97\x91cP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00

\x00 \x00 \x00 6  
'--------------'
 request length = 54

\x00 \x00
'-------'
  type = 0

\x00 \x08 m y - t o p i c
'-------' '-------------'
  len = 8     topic

\x00 \x00 \x00 \x00
'-----------------'
  partition = 0
  
\x00 \x00 \x00 "
'--------------'
 messageset length = 34

\x00 \x00 \x00 \x1e
'-----------------'
  message length = 30

\x01 magic = 1
\x01 compression = 1
\xbb\x83\x82\xe0 checksum = -1149009184

\x1f\x8b\x08\x00\x8b\x8acP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 = 
"test" gzipped, length=24

This all seems fine to me, but Kafka is throwing a strange error message:

[2012-09-26 19:36:55,253] TRACE 54 bytes read from /127.0.0.1:64196 
(kafka.network.Processor)
[2012-09-26 19:36:55,253] TRACE Handling produce request from /127.0.0.1:64196 
(kafka.request.logger)
[2012-09-26 19:36:55,256] TRACE Producer request ProducerRequest(my-topic,0,34) 
(kafka.request.logger)
[2012-09-26 19:36:55,259] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,260] TRACE Remaining bytes in iterator = 30 
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,260] TRACE size of data = 30 
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,264] DEBUG Message is compressed. Valid byte count = 0 
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] DEBUG makeNext() in internalIterator: innerDone = 
true (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] TRACE Remaining bytes in iterator = 0 
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] TRACE size of data = 1952805748 
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,277] ERROR Error processing ProduceRequest on my-topic:0 
(kafka.server.KafkaRequestHandlers)
kafka.common.InvalidMessageSizeException: invalid message size: 1952805748 only 
received bytes: 0 at 0( possible causes (1) a single message larger than the 
fetch size; (2) log corruption )
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:120)
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
        at 
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:149)
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
        at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
        at 
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
        at scala.collection.Iterator$class.foreach(Iterator.scala:631)
        at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
        at kafka.message.MessageSet.foreach(MessageSet.scala:87)
        at kafka.log.Log.append(Log.scala:205)
        at 
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
        at 
kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
        at 
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
        at 
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
        at kafka.network.Processor.handle(SocketServer.scala:296)
        at kafka.network.Processor.read(SocketServer.scala:319)
        at kafka.network.Processor.run(SocketServer.scala:214)
        at java.lang.Thread.run(Thread.java:680)

The strangest part is that the "invalid message size" 1952805748 is the 
decompressed message payload "test" represented as an int32. Any ideas?

-David

Reply via email to