[ 
https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13424952#comment-13424952
 ] 

Jun Rao commented on KAFKA-411:
-------------------------------

Thanks for the patch. It may not be the right fix though since it fixes the 
symptom, but not the cause. For each produce request, the broker does the 
following: (1) read all bytes of the request into a BoundedByteBufferReceive 
(SocketServer.read); (2) after all bytes of the request are ready, deserialize 
the bytes into a ProducerRequest (KafkaRequestHandler.handleProducerRequest); 
(3) finally, serve the request by adding topic data to logs.

What you observed is that in step 3, a topic name is corrupted somehow. 
However, this means that the corresponding ProducerRequest is corrupted. 
Assuming there is no corruption at the network layer (very unlikely), the 
corruption much have happened in step 1 or step 2. So, instead of patching a 
corrupted topic name, we should understand why a ProducerRequest can be 
corrupted and fix the cause. BTW, what's caused the corrupted topic could be 
causing the corrupted messages too.

                
> Message Error in high cocurrent environment
> -------------------------------------------
>
>                 Key: KAFKA-411
>                 URL: https://issues.apache.org/jira/browse/KAFKA-411
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7
>            Reporter: jian fan
>            Priority: Blocker
>              Labels: InvalidTopic
>             Fix For: 0.8, 0.7.2
>
>         Attachments: InvalidTopic.patch
>
>
> In high cocurrent environment,  these errors always appera in kafka broker:
> ERROR Error processing MultiProducerRequest on bxx:2 
> (kafka.server.KafkaRequestHandlers)
> kafka.message.InvalidMessageException: message is invalid, compression codec: 
> NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
>       at 
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>       at 
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>       at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
>       at kafka.message.MessageSet.foreach(MessageSet.scala:87)
>       at kafka.log.Log.append(Log.scala:205)
>       at 
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>       at 
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>       at 
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>       at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>       at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>       at 
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>       at 
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>       at 
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>       at kafka.network.Processor.handle(SocketServer.scala:296)
>       at kafka.network.Processor.read(SocketServer.scala:319)
>       at kafka.network.Processor.run(SocketServer.scala:214)
>       at java.lang.Thread.run(Thread.java:722)
>  ERROR Closing socket for /192.168.75.15 because of error 
> (kafka.network.Processor)
> kafka.common.InvalidTopicException: topic name can't be empty
>       at kafka.log.LogManager.getLogPool(LogManager.scala:159)
>       at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to