[ https://issues.apache.org/jira/browse/KAFKA-411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
jian fan updated KAFKA-411: --------------------------- Attachment: InvalidTopic.patch In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. So there are some chances that "topic" contains one or more characters that encode to bytes that include NULL (0) > Message Error in high cocurrent environment > ------------------------------------------- > > Key: KAFKA-411 > URL: https://issues.apache.org/jira/browse/KAFKA-411 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.7 > Reporter: jian fan > Priority: Blocker > Labels: InvalidTopic > Fix For: 0.8, 0.7.2 > > Attachments: InvalidTopic.patch > > > In high cocurrent environment, these errors always appera in kafka broker: > ERROR Error processing MultiProducerRequest on bxx:2 > (kafka.server.KafkaRequestHandlers) > kafka.message.InvalidMessageException: message is invalid, compression codec: > NoCompressionCodec size: 1030 curr offset: 1034 init offset: 0 > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > at kafka.log.Log.append(Log.scala:205) > at > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > at > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:722) > ERROR Closing socket for /192.168.75.15 because of error > (kafka.network.Processor) > kafka.common.InvalidTopicException: topic name can't be empty > at kafka.log.LogManager.getLogPool(LogManager.scala:159) > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:195) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira