[ 
https://issues.apache.org/jira/browse/KAFKA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13181141#comment-13181141
 ] 

Jun Rao commented on KAFKA-202:
-------------------------------

Overall, the patch looks good. Some comments:

1. KafkaServer.startup doesn't have to capture exception and shutdown. The 
caller in KafkaServerStarable already does that. Plus, it shuts down embedded 
consumer appropriately if needed.

2. There is KafkaRequestHandlers.scala.rej in the patch.

3. Unit test seems to fail occasionally, giving the following error.
[info] == core-kafka / kafka.integration.LazyInitProducerTest ==
[2012-01-05 21:57:38,773] ERROR Error processing MultiProducerRequest on test:0 
(kafka.server.KafkaApis:82)
java.nio.channels.ClosedChannelException
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
        at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
        at 
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
        at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
        at kafka.log.Log.append(Log.scala:215)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
        at 
kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
        at 
kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
        at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
        at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:43)
        at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
        at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
        at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,773] ERROR Error processing ProduceRequest on test:0 
(kafka.server.KafkaApis:82)
java.nio.channels.ClosedChannelException
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
        at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
        at 
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
        at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
        at kafka.log.Log.append(Log.scala:215)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:40)
        at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
        at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
        at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while 
handling producer request: null (kafka.server.KafkaApis:92)
java.nio.channels.ClosedChannelException
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
        at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
        at 
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
        at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
        at kafka.log.Log.append(Log.scala:215)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
        at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:55)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:40)
        at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
        at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
        at java.lang.Thread.run(Thread.java:662)
[2012-01-05 21:57:38,775] FATAL Halting due to unrecoverable I/O error while 
handling producer request: null (kafka.server.KafkaApis:92)
java.nio.channels.ClosedChannelException
        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
        at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
        at 
kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:75)
        at kafka.message.FileMessageSet.append(FileMessageSet.scala:161)
        at kafka.log.Log.append(Log.scala:215)
        at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$handleProducerRequest(KafkaApis.scala:71)
        at 
kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
        at 
kafka.server.KafkaApis$$anonfun$handleMultiProducerRequest$1.apply(KafkaApis.scala:64)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
        at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
        at kafka.server.KafkaApis.handleMultiProducerRequest(KafkaApis.scala:64)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:43)
        at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
        at 
kafka.server.KafkaServer$$anonfun$startup$2.apply(KafkaServer.scala:70)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:35)
        at java.lang.Thread.run(Thread.java:662)
[info] Test Starting: 
testProduceAndFetch(kafka.integration.LazyInitProducerTest)

                
> Make the request processing in kafka asynchonous
> ------------------------------------------------
>
>                 Key: KAFKA-202
>                 URL: https://issues.apache.org/jira/browse/KAFKA-202
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Jay Kreps
>            Assignee: Jay Kreps
>         Attachments: KAFKA-202-v2.patch, KAFKA-202-v3.patch, 
> KAFKA-48-socket-server-refactor-draft.patch
>
>
> We need to handle long-lived requests to support replication. To make this 
> work we need to make the processing mechanism asynchronous from the network 
> threads.
> To accomplish this we will retain the existing pool of network threads but 
> add a new pool of request handling threads. These will do all the disk I/O. 
> There will be a queue mechanism to transfer requests to and from this 
> secondary pool.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to