[ https://issues.apache.org/jira/browse/KAFKA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13181141#comment-13181141 ]
Jun Rao commented on KAFKA-202: ------------------------------- Overall, the patch looks good. Some comments: 1. KafkaServer.startup doesn't have to capture exception and shutdown. The caller in KafkaServerStarable already does that. Plus, it shuts down embedded consumer appropriately if needed. 2. There is KafkaRequestHandlers.scala.rej in the patch. 3. Unit test seems to fail occasionally, giving the following error. [info] == core-kafka / kafka.integration.LazyInitProducerTest == [2012-01-05 21:57:38,773] ERROR Error processing MultiProducerRequest on test:0 (kafka.server.KafkaApis:82) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64) at kafka.server.KafkaApis.handle(KafkaApis.scala:43) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,773] ERROR Error processing ProduceRequest on test:0 (kafka.server.KafkaApis:82) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55) at kafka.server.KafkaApis.handle(KafkaApis.scala:40) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer request: null (kafka.server.KafkaApis:92) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55) at kafka.server.KafkaApis.handle(KafkaApis.scala:40) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while handling producer request: null (kafka.server.KafkaApis:92) java.nio.channels.ClosedChannelException at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) at kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75) at kafka.message.FileMessageSet.append(FileMessageSet.scala:161) at kafka.log.Log.append(Log.scala:215) at kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64) at kafka.server.KafkaApis.handle(KafkaApis.scala:43) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35) at java.lang.Thread.run(Thread.java:662) [info] Test Starting: testProduceAndFetch(kafka.integration.LazyInitProducerTest) > Make the request processing in kafka asynchonous > ------------------------------------------------ > > Key: KAFKA-202 > URL: https://issues.apache.org/jira/browse/KAFKA-202 > Project: Kafka > Issue Type: New Feature > Reporter: Jay Kreps > Assignee: Jay Kreps > Attachments: KAFKA-202-v2.patch, KAFKA-202-v3.patch, > KAFKA-48-socket-server-refactor-draft.patch > > > We need to handle long-lived requests to support replication. To make this > work we need to make the processing mechanism asynchronous from the network > threads. > To accomplish this we will retain the existing pool of network threads but > add a new pool of request handling threads. These will do all the disk I/O. > There will be a queue mechanism to transfer requests to and from this > secondary pool. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira