Sure no problem. I actually did run the ‘localhost’ command to generate output 
for the e-mail, but that was a typo: I always use the right IP. But I ran it 
again, this time with the IP of the VM. I ran the command from my Mac (the 
client). Thanks for taking a look! Here it is:

Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ bin/kafka-console-producer.sh 
--broker-list 192.168.241.128:9092 --topic test
[2015-02-18 09:45:49,148] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
and boy are my arms tired
[2015-02-18 09:45:58,533] WARN Failed to send producer request with correlation 
id 2 to broker 0 with data for partitions [test,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-02-18 09:45:58,649] WARN Failed to send producer request with correlation 
id 5 to broker 0 with data for partitions [test,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-02-18 09:45:58,760] WARN Failed to send producer request with correlation 
id 8 to broker 0 with data for partitions [test,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-02-18 09:45:58,868] WARN Failed to send producer request with correlation 
id 11 to broker 0 with data for partitions [test,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at 
kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-02-18 09:45:58,980] ERROR Failed to send requests for topics test with 
correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
[2015-02-18 09:45:58,981] ERROR Error in handling batch of 1 events 
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

> On Feb 18, 2015, at 9:40 AM, Jiangjie Qin <j...@linkedin.com.INVALID> wrote:
> 
> I think your log did show that your are connecting to localhost:9092:
> 
> [2015-02-17 20:43:32,622] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092]
> failed (kafka.client.ClientUtils$)
> java.nio.channels.ClosedChannelException
> 
> 
> Can you check again?
> 
> -Jiangjie (Becket) Qin
> 
> On 2/17/15, 10:19 PM, "Gwen Shapira" <gshap...@cloudera.com> wrote:
> 
>> Time to debug Kafka then :)
>> 
>> Does the topic you are producing to exists? (you can check with
>> kafka-topics tool)
>> If not, do you have auto-creation enabled?
>> 
>> Which version are you on? Is it possible you ran into  KAFKA-1738?
>> 
>> 
>> On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane <r...@defend7.com>
>> wrote:
>> 
>>> Telnet seems to be able to connect from the Mac to the VM and from the
>>> VM
>>> to the VM:
>>> 
>>> From Mac to VM:
>>> Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet 192.168.241.128
>>> 9092
>>> Trying 192.168.241.128...
>>> Connected to 192.168.241.128.
>>> Escape character is '^]¹.
>>> 
>>> From VM to VM:
>>> rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet localhost 9092
>>> Trying ::1...
>>> Connected to localhost.
>>> Escape character is '^]¹.
>>> 
>>> From VM to Mac:
>>> rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet 192.168.1.27 9092
>>> Trying 192.168.1.27...
>>> telnet: Unable to connect to remote host: Connection refused
>>> 
>>> From Mac to Mac:
>>> Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet localhost 9092
>>> Trying ::1...
>>> telnet: connect to address ::1: Connection refused
>>> Trying 127.0.0.1...
>>> telnet: connect to address 127.0.0.1: Connection refused
>>> telnet: Unable to connect to remote host
>>> 
>>> 
>>>> On Feb 17, 2015, at 10:03 PM, Gwen Shapira <gshap...@cloudera.com>
>>> wrote:
>>>> 
>>>> What happens when you telnet to port 9092? try it from both your mac
>>> and
>>>> the ubuntu vm.
>>>> 
>>>> 
>>>> On Tue, Feb 17, 2015 at 9:26 PM, Richard Spillane <r...@defend7.com>
>>> wrote:
>>>> 
>>>>> I checked iptables and all rules are set to forward, so nothing
>>> should
>>> be
>>>>> blocked in the VM example. In the container example the port is
>>> explicitly
>>>>> EXPOSEd and other ports in a similar range (e.g., 8080) can be
>>> accessed
>>>>> just fine.
>>>>> 
>>>>>> On Feb 17, 2015, at 8:56 PM, Gwen Shapira <gshap...@cloudera.com>
>>> wrote:
>>>>>> 
>>>>>> Is it possible that you have iptables on the Ubuntu where you run
>>> your
>>>>>> broker?
>>>>>> 
>>>>>> Try disabling iptables and see if it fixes the issue.
>>>>>> 
>>>>>> On Tue, Feb 17, 2015 at 8:47 PM, Richard Spillane <r...@defend7.com>
>>>>> wrote:
>>>>>> 
>>>>>>> So I would like to have two machines: one running zookeeper and a
>>> single
>>>>>>> kafka node and another machine running a producer. I want to use
>>> the
>>>>> basic
>>>>>>> commands mentioned in the Quick Start guide to do this. However, I
>>> keep
>>>>>>> getting connection closed exceptions in the producer.
>>>>>>> 
>>>>>>> This is what I do:
>>>>>>> On the kafka/zookeeper machine:
>>>>>>> bin/zookeeper-server-start.sh config/zookeeper.properties &
>>>>>>> bin/kafka-server-start.sh config/server.properties &
>>>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181
>>>>>>> --replication-factor 1 --partitions 1 --topic test
>>>>>>> 
>>>>>>> Šso far so good, now on the producer machine:
>>>>>>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>> test
>>>>>>> [2015-02-17 20:43:28,466] WARN Property topic is not valid
>>>>>>> (kafka.utils.VerifiableProperties)
>>>>>>> echo
>>>>>>> <press enter to send Œecho¹ above>
>>>>>>> 
>>>>>>> Šnow it starts spewing the errors in the Producer Errors Appendix.
>>>>>>> 
>>>>>>> What I don¹t understand is why? I checked the default
>>> configurations
>>> and
>>>>>>> it is binding to all interfaces as the bind to localhost is
>>> commented
>>>>> out.
>>>>>>> I checked netstat and 9092 is open on the zookeeper/kafka machine.
>>> I
>>>>> have
>>>>>>> tried this with an Ubuntu VM and a container where the container
>>> hosts
>>>>> the
>>>>>>> zookeeper/kafka server and I have tried this with my native machine
>>> (OS
>>>>> X)
>>>>>>> and an Ubuntu VM where the VM is the zookeeper/kafka server. In
>>> both
>>>>> cases
>>>>>>> the same thing happens.
>>>>>>> 
>>>>>>> I am just trying to get the simplest possible configuration where
>>> the
>>>>>>> producer is not on the same machine as the kafka queue up and
>>> running.
>>>>> How
>>>>>>> can I make this work? Thanks for any help.
>>>>>>> 
>>>>>>> Producer Erros Appendix:
>>>>>>> 
>>>>>>> [2015-02-17 20:43:32,622] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 0 for topics [Set(test)] from broker
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(D
>>> efaultEventHandler.scala:67)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :67)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:32,625] ERROR fetching topic metadata for topics
>>>>>>> [Set(test)] from broker
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>> (kafka.utils.Utils$)
>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>> [Set(test)] from broker
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(D
>>> efaultEventHandler.scala:67)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :67)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      ... 12 more
>>>>>>> [2015-02-17 20:43:32,627] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 1 for topics [Set(test)] from broker
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition
>>> Info.scala:49)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
>>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>> pply(DefaultEventHandler.scala:150)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>> pply(DefaultEventHandler.scala:149)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>> a:59)
>>>>>>>      at
>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent
>>> Handler.scala:149)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
>>> entHandler.scala:95)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:32,628] ERROR Failed to collate messages by
>>> topic,
>>>>>>> partition due to: fetching topic metadata for topics [Set(test)]
>>> from
>>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>> [2015-02-17 20:43:32,734] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 2 for topics [Set(test)] from broker
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>> efaultEventHandler.scala:78)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :78)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:32,735] ERROR fetching topic metadata for topics
>>>>>>> [Set(test)] from broker
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>> (kafka.utils.Utils$)
>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>> [Set(test)] from broker
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>> efaultEventHandler.scala:78)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :78)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      ... 12 more
>>>>>>> [2015-02-17 20:43:32,737] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 3 for topics [Set(test)] from broker
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition
>>> Info.scala:49)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
>>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>> pply(DefaultEventHandler.scala:150)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>> pply(DefaultEventHandler.scala:149)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>> a:59)
>>>>>>>      at
>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent
>>> Handler.scala:149)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
>>> entHandler.scala:95)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:32,738] ERROR Failed to collate messages by
>>> topic,
>>>>>>> partition due to: fetching topic metadata for topics [Set(test)]
>>> from
>>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>> [2015-02-17 20:43:32,844] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 4 for topics [Set(test)] from broker
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>> efaultEventHandler.scala:78)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :78)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:32,844] ERROR fetching topic metadata for topics
>>>>>>> [Set(test)] from broker
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>> (kafka.utils.Utils$)
>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>> [Set(test)] from broker
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>> efaultEventHandler.scala:78)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :78)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      ... 12 more
>>>>>>> [2015-02-17 20:43:32,846] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 5 for topics [Set(test)] from broker
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition
>>> Info.scala:49)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
>>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>> pply(DefaultEventHandler.scala:150)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>> pply(DefaultEventHandler.scala:149)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>> a:59)
>>>>>>>      at
>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent
>>> Handler.scala:149)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
>>> entHandler.scala:95)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:32,847] ERROR Failed to collate messages by
>>> topic,
>>>>>>> partition due to: fetching topic metadata for topics [Set(test)]
>>> from
>>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>> [2015-02-17 20:43:32,953] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 6 for topics [Set(test)] from broker
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>> efaultEventHandler.scala:78)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :78)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:32,954] ERROR fetching topic metadata for topics
>>>>>>> [Set(test)] from broker
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>> (kafka.utils.Utils$)
>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>> [Set(test)] from broker
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>> efaultEventHandler.scala:78)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :78)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      ... 12 more
>>>>>>> [2015-02-17 20:43:32,955] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 7 for topics [Set(test)] from broker
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition
>>> Info.scala:49)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
>>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>> pply(DefaultEventHandler.scala:150)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>> pply(DefaultEventHandler.scala:149)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>> a:59)
>>>>>>>      at
>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent
>>> Handler.scala:149)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
>>> entHandler.scala:95)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:32,957] ERROR Failed to collate messages by 
>>> topic,
>>>>>>> partition due to: fetching topic metadata for topics [Set(test)] 
>>> from
>>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>> [2015-02-17 20:43:33,063] WARN Fetching topic metadata with
>>> correlation
>>>>> id
>>>>>>> 8 for topics [Set(test)] from broker 
>>> [id:0,host:localhost,port:9092]
>>>>> failed
>>>>>>> (kafka.client.ClientUtils$)
>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>> efaultEventHandler.scala:78)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :78)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> [2015-02-17 20:43:33,064] ERROR fetching topic metadata for topics
>>>>>>> [Set(test)] from broker 
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>> (kafka.utils.Utils$)
>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>> [Set(test)] from broker 
>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>> failed
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>> 2)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>> efaultEventHandler.scala:78)
>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :78)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>      at 
>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>      at
>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>> cer.scala:72)
>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>      at
>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>      ... 12 more
>>>>>>> [2015-02-17 20:43:33,066] ERROR Failed to send requests for topics
>>> test
>>>>>>> with correlation ids in [0,8]
>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>> [2015-02-17 20:43:33,067] ERROR Error in handling batch of 1 events
>>>>>>> (kafka.producer.async.ProducerSendThread)
>>>>>>> kafka.common.FailedToSendMessageException: Failed to send messages
>>>>> after 3
>>>>>>> tries.
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>> :90)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>> ala:105)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:88)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>> oducerSendThread.scala:68)
>>>>>>>      at 
>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>      at
>>>>>>> 
>>>>> 
>>> 
>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>> scala:67)
>>>>>>>      at
>>>>>>> 
>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 

Reply via email to