The producer machine lists 'localhost:9092' for the Kafka connection? They're on two different machines aren't they?
-----Original Message----- From: Gwen Shapira [gshap...@cloudera.com] Received: Tuesday, 17 Feb 2015, 8:57PM To: users@kafka.apache.org [users@kafka.apache.org] Subject: Re: Having trouble with the simplest remote kafka config Is it possible that you have iptables on the Ubuntu where you run your broker? Try disabling iptables and see if it fixes the issue. On Tue, Feb 17, 2015 at 8:47 PM, Richard Spillane <r...@defend7.com> wrote: > So I would like to have two machines: one running zookeeper and a single > kafka node and another machine running a producer. I want to use the basic > commands mentioned in the Quick Start guide to do this. However, I keep > getting connection closed exceptions in the producer. > > This is what I do: > On the kafka/zookeeper machine: > bin/zookeeper-server-start.sh config/zookeeper.properties & > bin/kafka-server-start.sh config/server.properties & > bin/kafka-topics.sh --create --zookeeper localhost:2181 > --replication-factor 1 --partitions 1 --topic test > > …so far so good, now on the producer machine: > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > [2015-02-17 20:43:28,466] WARN Property topic is not valid > (kafka.utils.VerifiableProperties) > echo > <press enter to send ‘echo’ above> > > …now it starts spewing the errors in the Producer Errors Appendix. > > What I don’t understand is why? I checked the default configurations and > it is binding to all interfaces as the bind to localhost is commented out. > I checked netstat and 9092 is open on the zookeeper/kafka machine. I have > tried this with an Ubuntu VM and a container where the container hosts the > zookeeper/kafka server and I have tried this with my native machine (OS X) > and an Ubuntu VM where the VM is the zookeeper/kafka server. In both cases > the same thing happens. > > I am just trying to get the simplest possible configuration where the > producer is not on the same machine as the kafka queue up and running. How > can I make this work? Thanks for any help. > > Producer Erros Appendix: > > [2015-02-17 20:43:32,622] WARN Fetching topic metadata with correlation id > 0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:32,625] ERROR fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.utils.Utils$) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > ... 12 more > [2015-02-17 20:43:32,627] WARN Fetching topic metadata with correlation id > 1 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:32,628] ERROR Failed to collate messages by topic, > partition due to: fetching topic metadata for topics [Set(test)] from > broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.producer.async.DefaultEventHandler) > [2015-02-17 20:43:32,734] WARN Fetching topic metadata with correlation id > 2 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:32,735] ERROR fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.utils.Utils$) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > ... 12 more > [2015-02-17 20:43:32,737] WARN Fetching topic metadata with correlation id > 3 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:32,738] ERROR Failed to collate messages by topic, > partition due to: fetching topic metadata for topics [Set(test)] from > broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.producer.async.DefaultEventHandler) > [2015-02-17 20:43:32,844] WARN Fetching topic metadata with correlation id > 4 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:32,844] ERROR fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.utils.Utils$) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > ... 12 more > [2015-02-17 20:43:32,846] WARN Fetching topic metadata with correlation id > 5 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:32,847] ERROR Failed to collate messages by topic, > partition due to: fetching topic metadata for topics [Set(test)] from > broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.producer.async.DefaultEventHandler) > [2015-02-17 20:43:32,953] WARN Fetching topic metadata with correlation id > 6 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:32,954] ERROR fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.utils.Utils$) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > ... 12 more > [2015-02-17 20:43:32,955] WARN Fetching topic metadata with correlation id > 7 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:32,957] ERROR Failed to collate messages by topic, > partition due to: fetching topic metadata for topics [Set(test)] from > broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.producer.async.DefaultEventHandler) > [2015-02-17 20:43:33,063] WARN Fetching topic metadata with correlation id > 8 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] failed > (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > [2015-02-17 20:43:33,064] ERROR fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > (kafka.utils.Utils$) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > at kafka.utils.Utils$.swallow(Utils.scala:172) > at kafka.utils.Logging$class.swallowError(Logging.scala:106) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > at > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > ... 12 more > [2015-02-17 20:43:33,066] ERROR Failed to send requests for topics test > with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) > [2015-02-17 20:43:33,067] ERROR Error in handling batch of 1 events > (kafka.producer.async.ProducerSendThread) > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > > >