[ 
https://issues.apache.org/jira/browse/KAFKA-145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13121537#comment-13121537
 ] 

Jun Rao commented on KAFKA-145:
-------------------------------

In MirroringThread.run, it's probably better to do the countdown even when we 
hit an exception.
                
> Kafka server mirror shutdown bug
> --------------------------------
>
>                 Key: KAFKA-145
>                 URL: https://issues.apache.org/jira/browse/KAFKA-145
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>             Fix For: 0.7
>
>         Attachments: KAFKA-145.patch
>
>
> When a machine that is mirroring data off of another Kafka broker is 
> shutdown, it runs into the following exception, effectively dropping data. 
> The shutdown API needs to be fixed to first shutdown the consumer threads, 
> drain all the data to the producer, and only then shutdown the producer. 
> FATAL kafka.server.EmbeddedConsumer  - 
> kafka.producer.async.QueueClosedException: Attempt to add event to a closed 
> queue.kafka.producer.async.QueueClosedException: Attempt to add event to a 
> closed queue.
>         at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:87)
>         at 
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
>         at 
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
>         at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>         at scala.collection.immutable.List.foreach(List.scala:45)
>         at 
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
>         at 
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>         at 
> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
>         at 
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>         at 
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>         at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
>         at kafka.producer.Producer.zkSend(Producer.scala:144)
>         at kafka.producer.Producer.send(Producer.scala:106)
>         at 
> kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
>         at 
> kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>         at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to