Time to debug Kafka then :) Does the topic you are producing to exists? (you can check with kafka-topics tool) If not, do you have auto-creation enabled?
Which version are you on? Is it possible you ran into KAFKA-1738? On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane <r...@defend7.com> wrote: > Telnet seems to be able to connect from the Mac to the VM and from the VM > to the VM: > > From Mac to VM: > Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet 192.168.241.128 9092 > Trying 192.168.241.128... > Connected to 192.168.241.128. > Escape character is '^]’. > > From VM to VM: > rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet localhost 9092 > Trying ::1... > Connected to localhost. > Escape character is '^]’. > > From VM to Mac: > rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet 192.168.1.27 9092 > Trying 192.168.1.27... > telnet: Unable to connect to remote host: Connection refused > > From Mac to Mac: > Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet localhost 9092 > Trying ::1... > telnet: connect to address ::1: Connection refused > Trying 127.0.0.1... > telnet: connect to address 127.0.0.1: Connection refused > telnet: Unable to connect to remote host > > > > On Feb 17, 2015, at 10:03 PM, Gwen Shapira <gshap...@cloudera.com> > wrote: > > > > What happens when you telnet to port 9092? try it from both your mac and > > the ubuntu vm. > > > > > > On Tue, Feb 17, 2015 at 9:26 PM, Richard Spillane <r...@defend7.com> > wrote: > > > >> I checked iptables and all rules are set to forward, so nothing should > be > >> blocked in the VM example. In the container example the port is > explicitly > >> EXPOSEd and other ports in a similar range (e.g., 8080) can be accessed > >> just fine. > >> > >>> On Feb 17, 2015, at 8:56 PM, Gwen Shapira <gshap...@cloudera.com> > wrote: > >>> > >>> Is it possible that you have iptables on the Ubuntu where you run your > >>> broker? > >>> > >>> Try disabling iptables and see if it fixes the issue. > >>> > >>> On Tue, Feb 17, 2015 at 8:47 PM, Richard Spillane <r...@defend7.com> > >> wrote: > >>> > >>>> So I would like to have two machines: one running zookeeper and a > single > >>>> kafka node and another machine running a producer. I want to use the > >> basic > >>>> commands mentioned in the Quick Start guide to do this. However, I > keep > >>>> getting connection closed exceptions in the producer. > >>>> > >>>> This is what I do: > >>>> On the kafka/zookeeper machine: > >>>> bin/zookeeper-server-start.sh config/zookeeper.properties & > >>>> bin/kafka-server-start.sh config/server.properties & > >>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 > >>>> --replication-factor 1 --partitions 1 --topic test > >>>> > >>>> …so far so good, now on the producer machine: > >>>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > test > >>>> [2015-02-17 20:43:28,466] WARN Property topic is not valid > >>>> (kafka.utils.VerifiableProperties) > >>>> echo > >>>> <press enter to send ‘echo’ above> > >>>> > >>>> …now it starts spewing the errors in the Producer Errors Appendix. > >>>> > >>>> What I don’t understand is why? I checked the default configurations > and > >>>> it is binding to all interfaces as the bind to localhost is commented > >> out. > >>>> I checked netstat and 9092 is open on the zookeeper/kafka machine. I > >> have > >>>> tried this with an Ubuntu VM and a container where the container hosts > >> the > >>>> zookeeper/kafka server and I have tried this with my native machine > (OS > >> X) > >>>> and an Ubuntu VM where the VM is the zookeeper/kafka server. In both > >> cases > >>>> the same thing happens. > >>>> > >>>> I am just trying to get the simplest possible configuration where the > >>>> producer is not on the same machine as the kafka queue up and running. > >> How > >>>> can I make this work? Thanks for any help. > >>>> > >>>> Producer Erros Appendix: > >>>> > >>>> [2015-02-17 20:43:32,622] WARN Fetching topic metadata with > correlation > >> id > >>>> 0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:32,625] ERROR fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> (kafka.utils.Utils$) > >>>> kafka.common.KafkaException: fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> Caused by: java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> ... 12 more > >>>> [2015-02-17 20:43:32,627] WARN Fetching topic metadata with > correlation > >> id > >>>> 1 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > >>>> at > >>>> > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >>>> at > >>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:32,628] ERROR Failed to collate messages by topic, > >>>> partition due to: fetching topic metadata for topics [Set(test)] from > >>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > >>>> (kafka.producer.async.DefaultEventHandler) > >>>> [2015-02-17 20:43:32,734] WARN Fetching topic metadata with > correlation > >> id > >>>> 2 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:32,735] ERROR fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> (kafka.utils.Utils$) > >>>> kafka.common.KafkaException: fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> Caused by: java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> ... 12 more > >>>> [2015-02-17 20:43:32,737] WARN Fetching topic metadata with > correlation > >> id > >>>> 3 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > >>>> at > >>>> > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >>>> at > >>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:32,738] ERROR Failed to collate messages by topic, > >>>> partition due to: fetching topic metadata for topics [Set(test)] from > >>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > >>>> (kafka.producer.async.DefaultEventHandler) > >>>> [2015-02-17 20:43:32,844] WARN Fetching topic metadata with > correlation > >> id > >>>> 4 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:32,844] ERROR fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> (kafka.utils.Utils$) > >>>> kafka.common.KafkaException: fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> Caused by: java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> ... 12 more > >>>> [2015-02-17 20:43:32,846] WARN Fetching topic metadata with > correlation > >> id > >>>> 5 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > >>>> at > >>>> > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >>>> at > >>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:32,847] ERROR Failed to collate messages by topic, > >>>> partition due to: fetching topic metadata for topics [Set(test)] from > >>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > >>>> (kafka.producer.async.DefaultEventHandler) > >>>> [2015-02-17 20:43:32,953] WARN Fetching topic metadata with > correlation > >> id > >>>> 6 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:32,954] ERROR fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> (kafka.utils.Utils$) > >>>> kafka.common.KafkaException: fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> Caused by: java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> ... 12 more > >>>> [2015-02-17 20:43:32,955] WARN Fetching topic metadata with > correlation > >> id > >>>> 7 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:149) > >>>> at > >>>> > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >>>> at > >>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:149) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:95) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:32,957] ERROR Failed to collate messages by topic, > >>>> partition due to: fetching topic metadata for topics [Set(test)] from > >>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed > >>>> (kafka.producer.async.DefaultEventHandler) > >>>> [2015-02-17 20:43:33,063] WARN Fetching topic metadata with > correlation > >> id > >>>> 8 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > >> failed > >>>> (kafka.client.ClientUtils$) > >>>> java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> [2015-02-17 20:43:33,064] ERROR fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> (kafka.utils.Utils$) > >>>> kafka.common.KafkaException: fetching topic metadata for topics > >>>> [Set(test)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] > >> failed > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) > >>>> at > >>>> > >> > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78) > >>>> at kafka.utils.Utils$.swallow(Utils.scala:172) > >>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) > >>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> Caused by: java.nio.channels.ClosedChannelException > >>>> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > >>>> at > >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) > >>>> at > >>>> > >> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) > >>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > >>>> at > >>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) > >>>> ... 12 more > >>>> [2015-02-17 20:43:33,066] ERROR Failed to send requests for topics > test > >>>> with correlation ids in [0,8] > (kafka.producer.async.DefaultEventHandler) > >>>> [2015-02-17 20:43:33,067] ERROR Error in handling batch of 1 events > >>>> (kafka.producer.async.ProducerSendThread) > >>>> kafka.common.FailedToSendMessageException: Failed to send messages > >> after 3 > >>>> tries. > >>>> at > >>>> > >> > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) > >>>> at scala.collection.immutable.Stream.foreach(Stream.scala:547) > >>>> at > >>>> > >> > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) > >>>> at > >>>> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > >>>> > >>>> > >>>> > >> > >> > >