Yes, the topic I am producing to exists. I can produce to it fine when running the kafa-console-producer.sh tool from the Kafka node (the VM) itself (in which case I can either use localhost or the public-facing IP):
Here is where I produce messages (running on the Kafka node): ========================== rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list 192.168.241.128:9092 --topic test [2015-02-18 09:29:19,373] WARN Property topic is not valid (kafka.utils.VerifiableProperties) echo [2015-02-18 09:29:21,786] INFO Closing socket connection to /192.168.241.128. (kafka.network.Processor) bobs your uncle other bobs your uncle ^Z [3]+ Stopped bin/kafka-console-producer.sh --broker-list 192.168.241.128:9092 --topic test Here is where I consume them (also on the Kafka node): ========================== rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-console-consumer.sh --zookeeper 192.168.241.128:2181 --topic test --from-beginning [2015-02-18 09:30:52,566] INFO Accepted socket connection from /192.168.241.128:40532 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-18 09:30:52,577] INFO Client attempting to establish new session at /192.168.241.128:40532 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:30:52,579] INFO Established session 0x14b9afea96f0003 with negotiated timeout 30000 for client /192.168.241.128:40532 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:30:53,050] INFO Accepted socket connection from /192.168.241.128:40533 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-18 09:30:53,060] INFO Client attempting to establish new session at /192.168.241.128:40533 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:30:53,062] INFO Established session 0x14b9afea96f0004 with negotiated timeout 6000 for client /192.168.241.128:40533 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:30:53,237] INFO Got user-level KeeperException when processing sessionid:0x14b9afea96f0004 type:create cxid:0x2 zxid:0x42 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-23321/ids Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321/ids (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:30:53,246] INFO Got user-level KeeperException when processing sessionid:0x14b9afea96f0004 type:create cxid:0x3 zxid:0x43 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-23321 Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321 (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:30:53,903] INFO Got user-level KeeperException when processing sessionid:0x14b9afea96f0004 type:create cxid:0x19 zxid:0x47 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-23321/owners/test Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321/owners/test (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:30:53,905] INFO Got user-level KeeperException when processing sessionid:0x14b9afea96f0004 type:create cxid:0x1a zxid:0x48 txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-23321/owners Error:KeeperErrorCode = NoNode for /consumers/console-consumer-23321/owners (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:30:54,101] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) echo bobs your uncle other bobs your uncle The ‘test’ topic does exist: ========================== rick@ubuntu:~/kafka_2.10-0.8.2.0$ bin/kafka-topics.sh --list --zookeeper localhost:2181 [2015-02-18 09:32:56,991] INFO Accepted socket connection from /127.0.0.1:42143 (org.apache.zookeeper.server.NIOServerCnxnFactory) [2015-02-18 09:32:56,996] INFO Client attempting to establish new session at /127.0.0.1:42143 (org.apache.zookeeper.server.ZooKeeperServer) [2015-02-18 09:32:56,997] INFO Established session 0x14b9afea96f0006 with negotiated timeout 30000 for client /127.0.0.1:42143 (org.apache.zookeeper.server.ZooKeeperServer) test [2015-02-18 09:32:57,053] INFO Processed session termination for sessionid: 0x14b9afea96f0006 (org.apache.zookeeper.server.PrepRequestProcessor) [2015-02-18 09:32:57,054] INFO Closed socket connection for client /127.0.0.1:42143 which had sessionid 0x14b9afea96f0006 (org.apache.zookeeper.server.NIOServerCnxn) The version of Kafka I am using is kafka_2.10-0.8.2.0. I am hoping that this is such a simple configuration that debugging would be overkill, but rather there is some simple setting used by the Quick Start example that I missed that precludes a producer posting to a remote queue :) > On Feb 17, 2015, at 10:19 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > > Time to debug Kafka then :) > > Does the topic you are producing to exists? (you can check with > kafka-topics tool) > If not, do you have auto-creation enabled? > > Which version are you on? Is it possible you ran into KAFKA-1738? > > > On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane <r...@defend7.com> wrote: > >> Telnet seems to be able to connect from the Mac to the VM and from the VM >> to the VM: >> >> From Mac to VM: >> Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet 192.168.241.128 9092 >> Trying 192.168.241.128... >> Connected to 192.168.241.128. >> Escape character is '^]’. >> >> From VM to VM: >> rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet localhost 9092 >> Trying ::1... >> Connected to localhost. >> Escape character is '^]’. >> >> From VM to Mac: >> rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet 192.168.1.27 9092 >> Trying 192.168.1.27... >> telnet: Unable to connect to remote host: Connection refused >> >> From Mac to Mac: >> Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet localhost 9092 >> Trying ::1... >> telnet: connect to address ::1: Connection refused >> Trying 127.0.0.1... >> telnet: connect to address 127.0.0.1: Connection refused >> telnet: Unable to connect to remote host >> >> >>> On Feb 17, 2015, at 10:03 PM, Gwen Shapira <gshap...@cloudera.com> >> wrote: >>> >>> What happens when you telnet to port 9092? try it from both your mac and >>> the ubuntu vm. >>> >>> >>> On Tue, Feb 17, 2015 at 9:26 PM, Richard Spillane <r...@defend7.com> >> wrote: >>> >>>> I checked iptables and all rules are set to forward, so nothing should >> be >>>> blocked in the VM example. In the container example the port is >> explicitly >>>> EXPOSEd and other ports in a similar range (e.g., 8080) can be accessed >>>> just fine. >>>> >>>>> On Feb 17, 2015, at 8:56 PM, Gwen Shapira <gshap...@cloudera.com> >> wrote: >>>>> >>>>> Is it possible that you have iptables on the Ubuntu where you run your >>>>> broker? >>>>> >>>>> Try disabling iptables and see if it fixes the issue. >>>>> >>>>> On Tue, Feb 17, 2015 at 8:47 PM, Richard Spillane <r...@defend7.com> >>>> wrote: >>>>> >>>>>> So I would like to have two machines: one running zookeeper and a >> single >>>>>> kafka node and another machine running a producer. I want to use the >>>> basic >>>>>> commands mentioned in the Quick Start guide to do this. However, I >> keep >>>>>> getting connection closed exceptions in the producer. >>>>>> >>>>>> This is what I do: >>>>>> On the kafka/zookeeper machine: >>>>>> bin/zookeeper-server-start.sh config/zookeeper.properties & >>>>>> bin/kafka-server-start.sh config/server.properties & >>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 >>>>>> --replication-factor 1 --partitions 1 --topic test >>>>>> >>>>>> …so far so good, now on the producer machine: >>>>>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >> test >>>>>> [2015-02-17 20:43:28,466] WARN Property topic is not valid >>>>>> (kafka.utils.VerifiableProperties) >>>>>> echo >>>>>> <press enter to send ‘echo’ above> >>>>>> >>>>>> …now it starts spewing the errors in the Producer Errors Appendix. >>>>>> >>>>>> What I don’t understand is why? I checked the default configurations >> and >>>>>> it is binding to all interfaces as the bind to localhost is commented >>>> out. >>>>>> I checked netstat and 9092 is open on the zookeeper/kafka machine. I >>>> have >>>>>> tried this with an Ubuntu VM and a container where the container hosts >>>> the >>>>>> zookeeper/kafka server and I have tried this with my native machine >> (OS >>>> X) >>>>>> and an Ubuntu VM where the VM is the zookeeper/kafka server. In both >>>> cases >>>>>> the same thing happens. >>>>>> >>>>>> I am just trying to get the simplest possible configuration where the >>>>>> producer is not on the same machine as the kafka queue up and running. >>>> How >>>>>> can I make this work? Thanks for any help. >>>>>> >>>>>> Producer Erros Appendix: >>>>>> >>>>>> [2015-02-17 20:43:32,622] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:32,625] ERROR fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> (kafka.utils.Utils$) >>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> ... 12 more >>>>>> [2015-02-17 20:43:32,627] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 1 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) >>>>>> at >>>>>> >>>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>> at >>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:32,628] ERROR Failed to collate messages by topic, >>>>>> partition due to: fetching topic metadata for topics [Set(test)] from >>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed >>>>>> (kafka.producer.async.DefaultEventHandler) >>>>>> [2015-02-17 20:43:32,734] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 2 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:32,735] ERROR fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> (kafka.utils.Utils$) >>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> ... 12 more >>>>>> [2015-02-17 20:43:32,737] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 3 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) >>>>>> at >>>>>> >>>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>> at >>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:32,738] ERROR Failed to collate messages by topic, >>>>>> partition due to: fetching topic metadata for topics [Set(test)] from >>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed >>>>>> (kafka.producer.async.DefaultEventHandler) >>>>>> [2015-02-17 20:43:32,844] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 4 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:32,844] ERROR fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> (kafka.utils.Utils$) >>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> ... 12 more >>>>>> [2015-02-17 20:43:32,846] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 5 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) >>>>>> at >>>>>> >>>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>> at >>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:32,847] ERROR Failed to collate messages by topic, >>>>>> partition due to: fetching topic metadata for topics [Set(test)] from >>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed >>>>>> (kafka.producer.async.DefaultEventHandler) >>>>>> [2015-02-17 20:43:32,953] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 6 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:32,954] ERROR fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> (kafka.utils.Utils$) >>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> ... 12 more >>>>>> [2015-02-17 20:43:32,955] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 7 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) >>>>>> at >>>>>> >>>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>> at >>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:32,957] ERROR Failed to collate messages by topic, >>>>>> partition due to: fetching topic metadata for topics [Set(test)] from >>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed >>>>>> (kafka.producer.async.DefaultEventHandler) >>>>>> [2015-02-17 20:43:33,063] WARN Fetching topic metadata with >> correlation >>>> id >>>>>> 8 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] >>>> failed >>>>>> (kafka.client.ClientUtils$) >>>>>> java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> [2015-02-17 20:43:33,064] ERROR fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> (kafka.utils.Utils$) >>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] >>>> failed >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>> at >>>>>> >>>> >> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) >>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>> at >>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>> at >>>>>> >>>> >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) >>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>> at >>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>> ... 12 more >>>>>> [2015-02-17 20:43:33,066] ERROR Failed to send requests for topics >> test >>>>>> with correlation ids in [0,8] >> (kafka.producer.async.DefaultEventHandler) >>>>>> [2015-02-17 20:43:33,067] ERROR Error in handling batch of 1 events >>>>>> (kafka.producer.async.ProducerSendThread) >>>>>> kafka.common.FailedToSendMessageException: Failed to send messages >>>> after 3 >>>>>> tries. >>>>>> at >>>>>> >>>> >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) >>>>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>> at >>>>>> >>>> >> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) >>>>>> at >>>>>> >> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>> >>>>>> >>>>>> >>>> >>>> >> >>