[
https://issues.apache.org/jira/browse/KAFKA-145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Neha Narkhede updated KAFKA-145:
--------------------------------
Attachment: KAFKA-145.patch
This patch corrects the shutdown behavior of Kafka mirroring, i.e.
EmbeddedConsumer. It first shuts down the new topic watcher, then the zookeeper
consumer connector. After this, it stops the mirroring threads. At this point,
all mirroring threads have finished mirroring all data that they've ever
consumed. Only then, it shuts down the producer.
This ensures that the producer is not shutdown, before the mirroring threads
have finished their work, thereby avoiding data loss caused due to
QueueClosedException.
> Kafka server mirror shutdown bug
> --------------------------------
>
> Key: KAFKA-145
> URL: https://issues.apache.org/jira/browse/KAFKA-145
> Project: Kafka
> Issue Type: Bug
> Components: core
> Reporter: Neha Narkhede
> Assignee: Neha Narkhede
> Fix For: 0.7
>
> Attachments: KAFKA-145.patch
>
>
> When a machine that is mirroring data off of another Kafka broker is
> shutdown, it runs into the following exception, effectively dropping data.
> The shutdown API needs to be fixed to first shutdown the consumer threads,
> drain all the data to the producer, and only then shutdown the producer.
> FATAL kafka.server.EmbeddedConsumer -
> kafka.producer.async.QueueClosedException: Attempt to add event to a closed
> queue.kafka.producer.async.QueueClosedException: Attempt to add event to a
> closed queue.
> at kafka.producer.async.AsyncProducer.send(AsyncProducer.scala:87)
> at
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
> at
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1$$anonfun$apply$2.apply(ProducerPool.scala:131)
> at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:131)
> at
> kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$apply$mcVI$sp$1.apply(ProducerPool.scala:130)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp(ProducerPool.scala:130)
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> at
> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at kafka.producer.ProducerPool.send(ProducerPool.scala:102)
> at kafka.producer.Producer.zkSend(Producer.scala:144)
> at kafka.producer.Producer.send(Producer.scala:106)
> at
> kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:136)
> at
> kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$1$$anonfun$apply$1$$anon$1$$anonfun$run$1.apply(KafkaServerStartable.scala:134)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira