[
https://issues.apache.org/jira/browse/KAFKA-145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Neha Narkhede updated KAFKA-145:
--------------------------------
Attachment: KAFKA-145.patch
Uploading an updated patch, in which the shutdown latch is decremented in a
finally block, to make sure the mirroring threads will shutdown even when they
run into an error/exception.
> Kafka server mirror shutdown bug
> --------------------------------
>
> Key: KAFKA-145
> URL: https://issues.apache.org/jira/browse/KAFKA-145
> Project: Kafka
> Issue Type: Bug
> Components: core
> Reporter: Neha Narkhede
> Assignee: Neha Narkhede
> Fix For: 0.7
>
> Attachments: KAFKA-145.patch, KAFKA-145.patch
>
>
> When a machine that is mirroring data off of another Kafka broker is
> shutdown, it runs into the following exception, effectively dropping data.
> The shutdown API needs to be fixed to first shutdown the consumer threads,
> drain all the data to the producer, and only then shutdown the producer.
> FATAL kafka.server.EmbeddedConsumer -
> kafka.producer.async.QueueClosedException: Attempt to add event to a closed
> queue.kafka.producer.async.QueueClosedException: Attempt to add event to a
> closed queue.
> at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:87)
> at
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
> at
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
> at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
> at
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
> at kafka.producer.Producer.zkSend(Producer.scala:144)
> at kafka.producer.Producer.send(Producer.scala:106)
> at
> kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
> at
> kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira