I am new to kafka and apologize if this is already answered. I am testing a
simple async publisher behavior when broker is down. I use kafka version 8.2.2.
I have set up "queue.buffering.max.messages" to 200 and
"queue.enqueue.timeout.ms" set to -1. My understanding is that if
"queue.enqueue.timeout.ms" is set to -1, the call to 'producer.send' should
block when queue of 200 is reached. But this is not what I am seeing.
My publisher has these properties.
Properties props = new Properties();
props.put("metadata.broker.list", "cno-d-igoberman2:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "async");
props.put("partitioner.class", "com.kcg.kafka.test.SimplePartitioner");
props.put("request.required.acks", "1");
props.put("queue.buffering.max.messages", "200");
props.put("queue.enqueue.timeout.ms", "-1");
This is scenario I am testing:
1) start broker.
2) start publishing in a loop.
3) kill broker.
At this point my producer keeps calling 'producer.send' without blocking (but
slows down considerably). I suspect that messages are lost - but this is not
what I want. Is this a known limitation of producers in kafka?
Any help in clarifying it will be appreciated. Also, I understand that
producers are in the process of being redesigned in the next release. When will
it be available? Should I even bother with the current version?
Thanks
This is what I am seeing in the log:
2015-10-30 14:50:29 INFO SyncProducer:68 - Disconnecting from
cno-d-igoberman2:9092
2015-10-30 14:50:29 ERROR Utils$:106 - fetching topic metadata for topics
[Set(test)] from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)]
failed
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)]
from broker [ArrayBuffer(id:0,host:cno-d-igoberman2,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
... 12 more
2015-10-30 14:50:29 ERROR DefaultEventHandler:97 - Failed to send requests for
topics test with correlation ids in [0,8]
2015-10-30 14:50:29 ERROR ProducerSendThread:103 - Error in handling batch of 5
events
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
2015-10-30 14:50:29 TRACE ProducerSendThread:36 - Dequeued item for topic test,
partition key: 5, data: 1446234628741: 5
2015-10-30 14:50:29 TRACE Producer:36 - Added to send queue an event:
KeyedMessage(test,6,6,1446234629741: 6)
2015-10-30 14:50:29 TRACE Producer:36 - Remaining queue size: 200
This e-mail and its attachments are intended only for the individual or entity
to whom it is addressed and may contain information that is confidential,
privileged, inside information, or subject to other restrictions on use or
disclosure. Any unauthorized use, dissemination or copying of this transmission
or the information in it is prohibited and may be unlawful. If you have
received this transmission in error, please notify the sender immediately by
return e-mail, and permanently delete or destroy this e-mail, any attachments,
and all copies (digital or paper). Unless expressly stated in this e-mail,
nothing in this message should be construed as a digital or electronic
signature. For additional important disclaimers and disclosures regarding KCG's
products and services, please click on the following link:
http://www.kcg.com/legal/global-disclosures