Jun: Hi. I find why the error appear. In high cocurrent environment, the tcp server will drop some package when the tcp buffer is over. So there are some chances that "topic" contains one or more characters that encode to bytes that include NULL (0). I have submit the patch to kafka-411, pls check that!
Thanks! Jian Fan 2012/7/30 Jun Rao <[email protected]> > Jian, > > All log directories in kafka are created by LogManager.createLog(). As you > can see, the directory always has the form of topic-partitionId. So, it's > not clear how a directory of "a" can be created in your case. I will try to > rerun your test and see if it can be reproduced. > > Thanks, > > Jun > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan <[email protected]> > wrote: > > > Jay: > > > > You can try to send 600 thousand message per second to the broker, you > > can find the tcp will drop packages, so sometimes the topic of ax will be > > a. I don't mean to slove the tcp problem from application level, I just > > find there are myabe a bug in file.mkdir() of LogManager.createlog. It > will > > infect the kafka useage. > > > > Thanks > > Jian Fan > > > > 2012/7/29 Jay Kreps <[email protected]> > > > > > Hmm, that is not my understanding of TCP. TCP is a reliable protocol so > > it > > > is supposed to either deliver packets in order or timeout retrying. In > > the > > > case of the topic name, that is a size-delimited string, there should > be > > no > > > way for it to drop a single byte in the middle of the request like > that. > > If > > > that is in fact happening, I don't think it is something we can hope to > > > recover from at the application level... > > > > > > -Jay > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan <[email protected]> > > > wrote: > > > > > > > Jun: > > > > Dropping packages in TCP is an issue of OS/JVM, but it can also > > cause > > > > some kafka issue! > > > > For example, the topic of the message is ax, but it can change to > a > > in > > > > broker because the some packages is drop, so the log directory > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() create log > > > > directory like a. Seems some bugs in file.mkdir() of > > > LogManager.createlog. > > > > If you shutdown the broker and restart it. The the broker will > > report > > > > the exception like this: > > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a' (kafka.log.LogManager) > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during KafkaServerStable > > > > startup. Prepare to shutdown (kafka.server.KafkaServerStartable) > > > > java.lang.StringIndexOutOfBoundsException: String index out of range: > > -1 > > > > at java.lang.String.substring(String.java:1949) > > > > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > > > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > > > > at kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > > > > at > > > > > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > at kafka.log.LogManager.<init>(LogManager.scala:65) > > > > at kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > > > at > > > > > > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > > > > at kafka.Kafka$.main(Kafka.scala:50) > > > > at kafka.Kafka.main(Kafka.scala) > > > > > > > > > > > > 2012/7/28 Jun Rao <[email protected]> > > > > > > > > > Jian, > > > > > > > > > > I am not sure if I understand this completely. Dropping packages in > > TCP > > > > > shouldn't cause corruption in the TCP buffer, right? Is this an > issue > > > in > > > > > Kafka or OS/JVM? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan < > [email protected]> > > > > > wrote: > > > > > > > > > > > Jun: > > > > > > Yes, if the socket server can't handle the package quickly, tcp > > > > protocol > > > > > > will drop some network package until the buffer is overflow, the > > > > > corrupted > > > > > > messages is also appear on this situtation! I run a systemtap > > script > > > > to > > > > > > find the package droping ,also you can type " cat > > /proc/net/sockstat" > > > > to > > > > > > see the tcp memory increase. I debug the whole kafka source code > > to > > > > find > > > > > > the bug in file.mkdir() of LogManager.createlog. > > > > > > > > > > > > JIan Fan > > > > > > > > > > > > 2012/7/27 Jun Rao <[email protected]> > > > > > > > > > > > > > Thanks for the finding. Are you saying that this problem is > > caused > > > by > > > > > the > > > > > > > buffering in Kafka socket server? How did you figure that out? > Is > > > > this > > > > > > > problem exposed by the same test that caused the corrupted > > messages > > > > in > > > > > > the > > > > > > > broker? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan < > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > In high cocurrent environment, the tcp server will drop > > some > > > > > > package > > > > > > > > when the tcp buffer is over. Then LogManager.createlog will > > > create > > > > > some > > > > > > > > no-exists topic log. But one thing is very strange, the log > > > > directory > > > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() > create > > > log > > > > > > > > directory like a. Seems some bug in file.mkdir() of > > > > > > LogManager.createlog. > > > > > > > > > > > > > > > > the exception message is > > > > > > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory > > /data/kafka/axx-0 > > > > > > > > (kafka.log.LogManager) > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing > > > > MultiProducerRequest > > > > > > on > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers) > > > > > > > > java.io.FileNotFoundException: > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka > > > > > > > > (Is a directory) > > > > > > > > at java.io.RandomAccessFile.open(Native Method) > > > > > > > > at java.io.RandomAccessFile.<init>(RandomAccessFile.java:233) > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324) > > > > > > > > at > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75) > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144) > > > > > > > > at kafka.log.Log.<init>(Log.scala:116) > > > > > > > > at kafka.log.LogManager.createLog(LogManager.scala:159) > > > > > > > > at kafka.log.LogManager.getOrCreateLog(LogManager.scala:214) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > > > > > > at > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > > > > > > at > > > > > > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > > > > > > at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > > > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > > > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > > > > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > > > > > > at java.lang.Thread.run(Thread.java:679) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
