I'm working on adding gzip support to the Python client, and I'm running into
some issues. I think I might not understand exactly how the compression is
supposed to be implemented.
My initial approach was to set the compression byte to 1 to indicate gzip, and
then simply gzip the payload. Here is an example request sent to Kafka (with
byte-by-byte breakdown). I am sending the payload "test" to "my-topic"
partition 0:
\x00\x00\x006\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00"\x00\x00\x00\x1e\x01\x011\xc6\xb4\x08\x1f\x8b\x08\x00\x97\x91cP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00
\x00 \x00 \x00 6
'--------------'
request length = 54
\x00 \x00
'-------'
type = 0
\x00 \x08 m y - t o p i c
'-------' '-------------'
len = 8 topic
\x00 \x00 \x00 \x00
'-----------------'
partition = 0
\x00 \x00 \x00 "
'--------------'
messageset length = 34
\x00 \x00 \x00 \x1e
'-----------------'
message length = 30
\x01 magic = 1
\x01 compression = 1
\xbb\x83\x82\xe0 checksum = -1149009184
\x1f\x8b\x08\x00\x8b\x8acP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 =
"test" gzipped, length=24
This all seems fine to me, but Kafka is throwing a strange error message:
[2012-09-26 19:36:55,253] TRACE 54 bytes read from /127.0.0.1:64196
(kafka.network.Processor)
[2012-09-26 19:36:55,253] TRACE Handling produce request from /127.0.0.1:64196
(kafka.request.logger)
[2012-09-26 19:36:55,256] TRACE Producer request ProducerRequest(my-topic,0,34)
(kafka.request.logger)
[2012-09-26 19:36:55,259] DEBUG makeNext() in internalIterator: innerDone =
true (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,260] TRACE Remaining bytes in iterator = 30
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,260] TRACE size of data = 30
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,264] DEBUG Message is compressed. Valid byte count = 0
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] DEBUG makeNext() in internalIterator: innerDone =
true (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] TRACE Remaining bytes in iterator = 0
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] TRACE size of data = 1952805748
(kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,277] ERROR Error processing ProduceRequest on my-topic:0
(kafka.server.KafkaRequestHandlers)
kafka.common.InvalidMessageSizeException: invalid message size: 1952805748 only
received bytes: 0 at 0( possible causes (1) a single message larger than the
fetch size; (2) log corruption )
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:120)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:149)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at kafka.log.Log.append(Log.scala:205)
at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at
kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:680)
The strangest part is that the "invalid message size" 1952805748 is the
decompressed message payload "test" represented as an int32. Any ideas?
-David