I am new to kafka and apologize if this is already answered. I am testing a simple async publisher behavior when broker is down. I use kafka version 8.2.2.
I have set up "queue.buffering.max.messages" to 200 and "queue.enqueue.timeout.ms" set to -1. My understanding is that if "queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send' should block when queue of 200 is reached. But this is not what I am seeing. My publisher has these properties. Properties props = new Properties(); props.put("metadata.broker.list", "cno-d-igoberman2:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("producer.type", "async"); props.put("partitioner.class", "com.kcg.kafka.test.SimplePartitioner"); props.put("request.required.acks", "1"); props.put("queue.buffering.max.messages", "200"); props.put("queue.enqueue.timeout.ms", "-1"); This is scenario I am testing: 1) start broker. 2) start publishing in a loop. 3) kill broker. At this point my producer keeps calling 'producer.send' without blocking (but slows down considerably). I suspect that messages are lost - but this is not what I want. Is this a known limitation of producers in kafka? Any help in clarifying it will be appreciated. Also, I understand that producers are in the process of being redesigned in the next release. When will it be available? Should I even bother with the current version? Thanks This is what I am seeing in the log: 2015-10-30 14:50:29 INFO SyncProducer:68 - Disconnecting from cno-d-igoberman2:9092 2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] failed kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowError(Logging.scala:106) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) Caused by: java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer.send(SyncProducer.scala:113) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) ... 12 more 2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send requests for topics test with correlation ids in [0,8] 2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling batch of 5 events kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) 2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic test, partition key: 5, data: 1446234628741: 5 2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event: KeyedMessage(test,6,6,1446234629741: 6) 2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200 This e-mail and its attachments are intended only for the individual or entity to whom it is addressed and may contain information that is confidential, privileged, inside information, or subject to other restrictions on use or disclosure. Any unauthorized use, dissemination or copying of this transmission or the information in it is prohibited and may be unlawful. If you have received this transmission in error, please notify the sender immediately by return e-mail, and permanently delete or destroy this e-mail, any attachments, and all copies (digital or paper). Unless expressly stated in this e-mail, nothing in this message should be construed as a digital or electronic signature. For additional important disclaimers and disclosures regarding KCG's products and services, please click on the following link: http://www.kcg.com/legal/global-disclosures