I am new to kafka and apologize if this is already answered. I am testing a 
simple async publisher behavior when broker is down. I use kafka version 8.2.2.


I have set up "queue.buffering.max.messages" to 200 and 
"queue.enqueue.timeout.ms" set to -1. My understanding is that if 
"queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send' should 
block when queue of 200 is reached. But this is not what I am seeing.


My publisher has these properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "cno-d-igoberman2:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("producer.type", "async");
        props.put("partitioner.class", "com.kcg.kafka.test.SimplePartitioner");
        props.put("request.required.acks", "1");
        props.put("queue.buffering.max.messages", "200");
        props.put("queue.enqueue.timeout.ms", "-1");


This is scenario I am testing:

1) start broker.

2) start publishing in a loop.

3) kill broker.


At this point my producer keeps calling 'producer.send' without blocking (but 
slows down considerably). I suspect that messages are lost - but this is not 
what I want. Is this a known limitation of producers in kafka?

Any help in clarifying it will be appreciated. Also, I understand that 
producers are in the process of being redesigned in the next release. When will 
it be available? Should I even bother with the current version?

Thanks


This is what I am seeing in the log:


2015-10-30 14:50:29 INFO  SyncProducer:68 - Disconnecting from 
cno-d-igoberman2:9092
2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics 
[Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] 
failed
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] 
from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] failed
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
    at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
    at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
    at kafka.utils.Utils$.swallow(Utils.scala:172)
    at kafka.utils.Logging$class.swallowError(Logging.scala:106)
    at kafka.utils.Utils$.swallowError(Utils.scala:45)
    at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
    at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    ... 12 more
2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send requests for 
topics test with correlation ids in [0,8]
2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling batch of 5 
events
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
    at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:547)
    at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic test, 
partition key: 5, data: 1446234628741: 5
2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event: 
KeyedMessage(test,6,6,1446234629741: 6)
2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200



This e-mail and its attachments are intended only for the individual or entity 
to whom it is addressed and may contain information that is confidential, 
privileged, inside information, or subject to other restrictions on use or 
disclosure. Any unauthorized use, dissemination or copying of this transmission 
or the information in it is prohibited and may be unlawful. If you have 
received this transmission in error, please notify the sender immediately by 
return e-mail, and permanently delete or destroy this e-mail, any attachments, 
and all copies (digital or paper). Unless expressly stated in this e-mail, 
nothing in this message should be construed as a digital or electronic 
signature. For additional important disclaimers and disclosures regarding KCG's 
products and services, please click on the following link:

http://www.kcg.com/legal/global-disclosures

Reply via email to