I'm working on adding gzip support to the Python client, and I'm running into some issues. I think I might not understand exactly how the compression is supposed to be implemented.
My initial approach was to set the compression byte to 1 to indicate gzip, and then simply gzip the payload. Here is an example request sent to Kafka (with byte-by-byte breakdown). I am sending the payload "test" to "my-topic" partition 0: \x00\x00\x006\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00"\x00\x00\x00\x1e\x01\x011\xc6\xb4\x08\x1f\x8b\x08\x00\x97\x91cP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 \x00 \x00 \x00 6 '--------------' request length = 54 \x00 \x00 '-------' type = 0 \x00 \x08 m y - t o p i c '-------' '-------------' len = 8 topic \x00 \x00 \x00 \x00 '-----------------' partition = 0 \x00 \x00 \x00 " '--------------' messageset length = 34 \x00 \x00 \x00 \x1e '-----------------' message length = 30 \x01 magic = 1 \x01 compression = 1 \xbb\x83\x82\xe0 checksum = -1149009184 \x1f\x8b\x08\x00\x8b\x8acP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 = "test" gzipped, length=24 This all seems fine to me, but Kafka is throwing a strange error message: [2012-09-26 19:36:55,253] TRACE 54 bytes read from /127.0.0.1:64196 (kafka.network.Processor) [2012-09-26 19:36:55,253] TRACE Handling produce request from /127.0.0.1:64196 (kafka.request.logger) [2012-09-26 19:36:55,256] TRACE Producer request ProducerRequest(my-topic,0,34) (kafka.request.logger) [2012-09-26 19:36:55,259] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet) [2012-09-26 19:36:55,260] TRACE Remaining bytes in iterator = 30 (kafka.message.ByteBufferMessageSet) [2012-09-26 19:36:55,260] TRACE size of data = 30 (kafka.message.ByteBufferMessageSet) [2012-09-26 19:36:55,264] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet) [2012-09-26 19:36:55,276] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet) [2012-09-26 19:36:55,276] TRACE Remaining bytes in iterator = 0 (kafka.message.ByteBufferMessageSet) [2012-09-26 19:36:55,276] TRACE size of data = 1952805748 (kafka.message.ByteBufferMessageSet) [2012-09-26 19:36:55,277] ERROR Error processing ProduceRequest on my-topic:0 (kafka.server.KafkaRequestHandlers) kafka.common.InvalidMessageSizeException: invalid message size: 1952805748 only received bytes: 0 at 0( possible causes (1) a single message larger than the fetch size; (2) log corruption ) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:120) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:149) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:205) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:680) The strangest part is that the "invalid message size" 1952805748 is the decompressed message payload "test" represented as an int32. Any ideas? -David