OK. Steve Miller helped me solve the problem. I needed to explicitly set 
advertised.host.name to advertised.host.name=192.168.241.128. The logs showed 
the producer could connect to 9092 but when it was told which hosts to connect 
to to queue messages it got unresolvable hosts. By setting this explicitly to 
192.168.241.128 I forced the hosts returned to be the resolvable IP address of 
my VM.

> On Feb 18, 2015, at 10:07 AM, Richard Spillane <r...@defend7.com> wrote:
> 
> I also tried running the producer from the Mac client again, but this time 
> with TRACE and DEBUG options un-commented from the log4j.properties file on 
> the VM server. It seems that the connection is established (on port 50045) 
> and bytes are being read from the client (192.168.241.1). Then subsequent 
> connections are made (on ports 50046, 50047, 50048, and 50049). I am guessing 
> these are retry attempts made by the producer. So it looks like the 
> connection is made, and then Kafka decides to close it for some reason, and 
> the client continues to retry.
> 
> Here are the debug-on server-side logs:
> =====================
> [2015-02-18 09:59:53,819] TRACE Processor id 1 selection time = 300542531 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:53,952] TRACE Processor id 2 selection time = 301409787 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,019] TRACE Processor id 0 selection time = 300632222 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,077] TRACE Processor id 0 selection time = 57586199 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,077] DEBUG Processor 0 listening to new connection from 
> /192.168.241.1:50045 (kafka.network.Processor)
> [2015-02-18 09:59:54,084] TRACE Processor id 0 selection time = 6156172 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,084] TRACE 36 bytes read from /192.168.241.1:50045 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 1154956 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,085] TRACE Socket server received response to send, 
> registering for write: 
> Response(0,Request(0,sun.nio.ch.SelectionKeyImpl@420433c6,null,1424282394084,/192.168.241.1:50045),kafka.network.BoundedByteBufferSend@21e32e06,SendAction)
>  (kafka.network.Processor)
> [2015-02-18 09:59:54,085] TRACE Processor id 0 selection time = 28607 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,086] TRACE 70 bytes written to /192.168.241.1:50045 
> using key sun.nio.ch.SelectionKeyImpl@420433c6 (kafka.network.Processor)
> [2015-02-18 09:59:54,086] TRACE Finished writing, registering for read on 
> connection /192.168.241.1:50045 (kafka.network.Processor)
> [2015-02-18 09:59:54,097] TRACE Processor id 0 selection time = 11043038 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,098] INFO Closing socket connection to /192.168.241.1. 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,098] DEBUG Closing connection from /192.168.241.1:50045 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,121] TRACE Processor id 1 selection time = 301719474 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,253] TRACE Processor id 2 selection time = 300837240 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,259] TRACE Processor id 1 selection time = 137306479 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,259] DEBUG Processor 1 listening to new connection from 
> /192.168.241.1:50046 (kafka.network.Processor)
> [2015-02-18 09:59:54,260] TRACE Processor id 1 selection time = 42838 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,260] TRACE 36 bytes read from /192.168.241.1:50046 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,262] TRACE Socket server received response to send, 
> registering for write: 
> Response(1,Request(1,sun.nio.ch.SelectionKeyImpl@1c630e29,null,1424282394260,/192.168.241.1:50046),kafka.network.BoundedByteBufferSend@2b36b44e,SendAction)
>  (kafka.network.Processor)
> [2015-02-18 09:59:54,262] TRACE Processor id 1 selection time = 48788 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,263] TRACE 70 bytes written to /192.168.241.1:50046 
> using key sun.nio.ch.SelectionKeyImpl@1c630e29 (kafka.network.Processor)
> [2015-02-18 09:59:54,263] TRACE Finished writing, registering for read on 
> connection /192.168.241.1:50046 (kafka.network.Processor)
> [2015-02-18 09:59:54,263] TRACE Processor id 1 selection time = 40185 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,264] INFO Closing socket connection to /192.168.241.1. 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,264] DEBUG Closing connection from /192.168.241.1:50046 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,369] TRACE Processor id 2 selection time = 115233690 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,369] DEBUG Processor 2 listening to new connection from 
> /192.168.241.1:50047 (kafka.network.Processor)
> [2015-02-18 09:59:54,370] TRACE Processor id 2 selection time = 43183 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,370] TRACE 36 bytes read from /192.168.241.1:50047 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,372] TRACE Socket server received response to send, 
> registering for write: 
> Response(2,Request(2,sun.nio.ch.SelectionKeyImpl@26ec47e9,null,1424282394370,/192.168.241.1:50047),kafka.network.BoundedByteBufferSend@626525f5,SendAction)
>  (kafka.network.Processor)
> [2015-02-18 09:59:54,372] TRACE Processor id 2 selection time = 50442 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,372] TRACE 70 bytes written to /192.168.241.1:50047 
> using key sun.nio.ch.SelectionKeyImpl@26ec47e9 (kafka.network.Processor)
> [2015-02-18 09:59:54,373] TRACE Finished writing, registering for read on 
> connection /192.168.241.1:50047 (kafka.network.Processor)
> [2015-02-18 09:59:54,373] TRACE Processor id 2 selection time = 157199 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,374] INFO Closing socket connection to /192.168.241.1. 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,374] DEBUG Closing connection from /192.168.241.1:50047 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,400] TRACE Processor id 0 selection time = 301797382 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,480] TRACE Processor id 0 selection time = 79923364 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,481] DEBUG Processor 0 listening to new connection from 
> /192.168.241.1:50048 (kafka.network.Processor)
> [2015-02-18 09:59:54,481] TRACE Processor id 0 selection time = 42682 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,482] TRACE 36 bytes read from /192.168.241.1:50048 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,483] TRACE Socket server received response to send, 
> registering for write: 
> Response(0,Request(0,sun.nio.ch.SelectionKeyImpl@29b9d6a0,null,1424282394482,/192.168.241.1:50048),kafka.network.BoundedByteBufferSend@67b9b7c6,SendAction)
>  (kafka.network.Processor)
> [2015-02-18 09:59:54,484] TRACE Processor id 0 selection time = 47854 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,484] TRACE 70 bytes written to /192.168.241.1:50048 
> using key sun.nio.ch.SelectionKeyImpl@29b9d6a0 (kafka.network.Processor)
> [2015-02-18 09:59:54,485] TRACE Finished writing, registering for read on 
> connection /192.168.241.1:50048 (kafka.network.Processor)
> [2015-02-18 09:59:54,485] TRACE Processor id 0 selection time = 59405 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,487] INFO Closing socket connection to /192.168.241.1. 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,488] DEBUG Closing connection from /192.168.241.1:50048 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,566] TRACE Processor id 1 selection time = 300967115 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,593] TRACE Processor id 1 selection time = 27119153 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,594] DEBUG Processor 1 listening to new connection from 
> /192.168.241.1:50049 (kafka.network.Processor)
> [2015-02-18 09:59:54,595] TRACE Processor id 1 selection time = 45021 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,595] TRACE 36 bytes read from /192.168.241.1:50049 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,597] TRACE Socket server received response to send, 
> registering for write: 
> Response(1,Request(1,sun.nio.ch.SelectionKeyImpl@46a1aea1,null,1424282394595,/192.168.241.1:50049),kafka.network.BoundedByteBufferSend@20cff041,SendAction)
>  (kafka.network.Processor)
> [2015-02-18 09:59:54,597] TRACE Processor id 1 selection time = 53865 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,598] TRACE 70 bytes written to /192.168.241.1:50049 
> using key sun.nio.ch.SelectionKeyImpl@46a1aea1 (kafka.network.Processor)
> [2015-02-18 09:59:54,598] TRACE Finished writing, registering for read on 
> connection /192.168.241.1:50049 (kafka.network.Processor)
> [2015-02-18 09:59:54,599] TRACE Processor id 1 selection time = 367805 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,599] INFO Closing socket connection to /192.168.241.1. 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,600] DEBUG Closing connection from /192.168.241.1:50049 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,676] TRACE Processor id 2 selection time = 300650923 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,790] TRACE Processor id 0 selection time = 301307005 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,902] TRACE Processor id 1 selection time = 301191605 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:54,978] TRACE Processor id 2 selection time = 301717799 ns 
> (kafka.network.Processor)
> [2015-02-18 09:59:55,092] TRACE Processor id 0 selection time = 301280989 ns 
> (kafka.network.Processor)
> 
>> On Feb 18, 2015, at 9:40 AM, Jiangjie Qin <j...@linkedin.com.INVALID 
>> <mailto:j...@linkedin.com.INVALID>> wrote:
>> 
>> I think your log did show that your are connecting to localhost:9092:
>> 
>> [2015-02-17 20:43:32,622] WARN Fetching topic metadata with correlation id
>> 0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092]
>> failed (kafka.client.ClientUtils$)
>> java.nio.channels.ClosedChannelException
>> 
>> 
>> Can you check again?
>> 
>> -Jiangjie (Becket) Qin
>> 
>> On 2/17/15, 10:19 PM, "Gwen Shapira" <gshap...@cloudera.com 
>> <mailto:gshap...@cloudera.com>> wrote:
>> 
>>> Time to debug Kafka then :)
>>> 
>>> Does the topic you are producing to exists? (you can check with
>>> kafka-topics tool)
>>> If not, do you have auto-creation enabled?
>>> 
>>> Which version are you on? Is it possible you ran into  KAFKA-1738?
>>> 
>>> 
>>> On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane <r...@defend7.com 
>>> <mailto:r...@defend7.com>>
>>> wrote:
>>> 
>>>> Telnet seems to be able to connect from the Mac to the VM and from the
>>>> VM
>>>> to the VM:
>>>> 
>>>> From Mac to VM:
>>>> Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet 192.168.241.128
>>>> 9092
>>>> Trying 192.168.241.128...
>>>> Connected to 192.168.241.128.
>>>> Escape character is '^]¹.
>>>> 
>>>> From VM to VM:
>>>> rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet localhost 9092
>>>> Trying ::1...
>>>> Connected to localhost.
>>>> Escape character is '^]¹.
>>>> 
>>>> From VM to Mac:
>>>> rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet 192.168.1.27 9092
>>>> Trying 192.168.1.27...
>>>> telnet: Unable to connect to remote host: Connection refused
>>>> 
>>>> From Mac to Mac:
>>>> Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet localhost 9092
>>>> Trying ::1...
>>>> telnet: connect to address ::1: Connection refused
>>>> Trying 127.0.0.1...
>>>> telnet: connect to address 127.0.0.1: Connection refused
>>>> telnet: Unable to connect to remote host
>>>> 
>>>> 
>>>>> On Feb 17, 2015, at 10:03 PM, Gwen Shapira <gshap...@cloudera.com 
>>>>> <mailto:gshap...@cloudera.com>>
>>>> wrote:
>>>>> 
>>>>> What happens when you telnet to port 9092? try it from both your mac
>>>> and
>>>>> the ubuntu vm.
>>>>> 
>>>>> 
>>>>> On Tue, Feb 17, 2015 at 9:26 PM, Richard Spillane <r...@defend7.com 
>>>>> <mailto:r...@defend7.com>>
>>>> wrote:
>>>>> 
>>>>>> I checked iptables and all rules are set to forward, so nothing
>>>> should
>>>> be
>>>>>> blocked in the VM example. In the container example the port is
>>>> explicitly
>>>>>> EXPOSEd and other ports in a similar range (e.g., 8080) can be
>>>> accessed
>>>>>> just fine.
>>>>>> 
>>>>>>> On Feb 17, 2015, at 8:56 PM, Gwen Shapira <gshap...@cloudera.com 
>>>>>>> <mailto:gshap...@cloudera.com>>
>>>> wrote:
>>>>>>> 
>>>>>>> Is it possible that you have iptables on the Ubuntu where you run
>>>> your
>>>>>>> broker?
>>>>>>> 
>>>>>>> Try disabling iptables and see if it fixes the issue.
>>>>>>> 
>>>>>>> On Tue, Feb 17, 2015 at 8:47 PM, Richard Spillane <r...@defend7.com 
>>>>>>> <mailto:r...@defend7.com>>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> So I would like to have two machines: one running zookeeper and a
>>>> single
>>>>>>>> kafka node and another machine running a producer. I want to use
>>>> the
>>>>>> basic
>>>>>>>> commands mentioned in the Quick Start guide to do this. However, I
>>>> keep
>>>>>>>> getting connection closed exceptions in the producer.
>>>>>>>> 
>>>>>>>> This is what I do:
>>>>>>>> On the kafka/zookeeper machine:
>>>>>>>> bin/zookeeper-server-start.sh config/zookeeper.properties &
>>>>>>>> bin/kafka-server-start.sh config/server.properties &
>>>>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181
>>>>>>>> --replication-factor 1 --partitions 1 --topic test
>>>>>>>> 
>>>>>>>> Šso far so good, now on the producer machine:
>>>>>>>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
>>>> test
>>>>>>>> [2015-02-17 20:43:28,466] WARN Property topic is not valid
>>>>>>>> (kafka.utils.VerifiableProperties)
>>>>>>>> echo
>>>>>>>> <press enter to send Œecho¹ above>
>>>>>>>> 
>>>>>>>> Šnow it starts spewing the errors in the Producer Errors Appendix.
>>>>>>>> 
>>>>>>>> What I don¹t understand is why? I checked the default
>>>> configurations
>>>> and
>>>>>>>> it is binding to all interfaces as the bind to localhost is
>>>> commented
>>>>>> out.
>>>>>>>> I checked netstat and 9092 is open on the zookeeper/kafka machine.
>>>> I
>>>>>> have
>>>>>>>> tried this with an Ubuntu VM and a container where the container
>>>> hosts
>>>>>> the
>>>>>>>> zookeeper/kafka server and I have tried this with my native machine
>>>> (OS
>>>>>> X)
>>>>>>>> and an Ubuntu VM where the VM is the zookeeper/kafka server. In
>>>> both
>>>>>> cases
>>>>>>>> the same thing happens.
>>>>>>>> 
>>>>>>>> I am just trying to get the simplest possible configuration where
>>>> the
>>>>>>>> producer is not on the same machine as the kafka queue up and
>>>> running.
>>>>>> How
>>>>>>>> can I make this work? Thanks for any help.
>>>>>>>> 
>>>>>>>> Producer Erros Appendix:
>>>>>>>> 
>>>>>>>> [2015-02-17 20:43:32,622] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 0 for topics [Set(test)] from broker
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(D
>>>> efaultEventHandler.scala:67)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :67)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:32,625] ERROR fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>> (kafka.utils.Utils$)
>>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(D
>>>> efaultEventHandler.scala:67)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :67)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      ... 12 more
>>>>>>>> [2015-02-17 20:43:32,627] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 1 for topics [Set(test)] from broker
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition
>>>> Info.scala:49)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
>>>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>>> pply(DefaultEventHandler.scala:150)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>>> pply(DefaultEventHandler.scala:149)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>>> a:59)
>>>>>>>>      at
>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent
>>>> Handler.scala:149)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
>>>> entHandler.scala:95)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:32,628] ERROR Failed to collate messages by
>>>> topic,
>>>>>>>> partition due to: fetching topic metadata for topics [Set(test)]
>>>> from
>>>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
>>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>>> [2015-02-17 20:43:32,734] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 2 for topics [Set(test)] from broker
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>>> efaultEventHandler.scala:78)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :78)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:32,735] ERROR fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>> (kafka.utils.Utils$)
>>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>>> efaultEventHandler.scala:78)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :78)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      ... 12 more
>>>>>>>> [2015-02-17 20:43:32,737] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 3 for topics [Set(test)] from broker
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition
>>>> Info.scala:49)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
>>>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>>> pply(DefaultEventHandler.scala:150)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>>> pply(DefaultEventHandler.scala:149)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>>> a:59)
>>>>>>>>      at
>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent
>>>> Handler.scala:149)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
>>>> entHandler.scala:95)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:32,738] ERROR Failed to collate messages by
>>>> topic,
>>>>>>>> partition due to: fetching topic metadata for topics [Set(test)]
>>>> from
>>>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
>>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>>> [2015-02-17 20:43:32,844] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 4 for topics [Set(test)] from broker
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>>> efaultEventHandler.scala:78)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :78)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:32,844] ERROR fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>> (kafka.utils.Utils$)
>>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>>> efaultEventHandler.scala:78)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :78)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      ... 12 more
>>>>>>>> [2015-02-17 20:43:32,846] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 5 for topics [Set(test)] from broker
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition
>>>> Info.scala:49)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
>>>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>>> pply(DefaultEventHandler.scala:150)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>>> pply(DefaultEventHandler.scala:149)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>>> a:59)
>>>>>>>>      at
>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent
>>>> Handler.scala:149)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
>>>> entHandler.scala:95)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:32,847] ERROR Failed to collate messages by
>>>> topic,
>>>>>>>> partition due to: fetching topic metadata for topics [Set(test)]
>>>> from
>>>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
>>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>>> [2015-02-17 20:43:32,953] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 6 for topics [Set(test)] from broker
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>>> efaultEventHandler.scala:78)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :78)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:32,954] ERROR fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>> (kafka.utils.Utils$)
>>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>>> efaultEventHandler.scala:78)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :78)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      ... 12 more
>>>>>>>> [2015-02-17 20:43:32,955] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 7 for topics [Set(test)] from broker
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition
>>>> Info.scala:49)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven
>>>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>>> pply(DefaultEventHandler.scala:150)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a
>>>> pply(DefaultEventHandler.scala:149)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal
>>>> a:59)
>>>>>>>>      at
>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent
>>>> Handler.scala:149)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv
>>>> entHandler.scala:95)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:32,957] ERROR Failed to collate messages by 
>>>> topic,
>>>>>>>> partition due to: fetching topic metadata for topics [Set(test)] 
>>>> from
>>>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
>>>>>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>>> [2015-02-17 20:43:33,063] WARN Fetching topic metadata with
>>>> correlation
>>>>>> id
>>>>>>>> 8 for topics [Set(test)] from broker 
>>>> [id:0,host:localhost,port:9092]
>>>>>> failed
>>>>>>>> (kafka.client.ClientUtils$)
>>>>>>>> java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>>> efaultEventHandler.scala:78)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :78)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> [2015-02-17 20:43:33,064] ERROR fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker 
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>> (kafka.utils.Utils$)
>>>>>>>> kafka.common.KafkaException: fetching topic metadata for topics
>>>>>>>> [Set(test)] from broker 
>>>> [ArrayBuffer(id:0,host:localhost,port:9092)]
>>>>>> failed
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8
>>>> 2)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D
>>>> efaultEventHandler.scala:78)
>>>>>>>>      at kafka.utils.Utils$.swallow(Utils.scala:172)
>>>>>>>>      at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>>>>>>>>      at kafka.utils.Utils$.swallowError(Utils.scala:45)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :78)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>>>>>>>> Caused by: java.nio.channels.ClosedChannelException
>>>>>>>>      at 
>>>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>>>>>>>>      at
>>>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu
>>>> cer.scala:72)
>>>>>>>>      at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>>>>>>>>      at
>>>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>>>>>>>>      ... 12 more
>>>>>>>> [2015-02-17 20:43:33,066] ERROR Failed to send requests for topics
>>>> test
>>>>>>>> with correlation ids in [0,8]
>>>> (kafka.producer.async.DefaultEventHandler)
>>>>>>>> [2015-02-17 20:43:33,067] ERROR Error in handling batch of 1 events
>>>>>>>> (kafka.producer.async.ProducerSendThread)
>>>>>>>> kafka.common.FailedToSendMessageException: Failed to send messages
>>>>>> after 3
>>>>>>>> tries.
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>>> :90)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc
>>>> ala:105)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:88)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr
>>>> oducerSendThread.scala:68)
>>>>>>>>      at 
>>>> scala.collection.immutable.Stream.foreach(Stream.scala:547)
>>>>>>>>      at
>>>>>>>> 
>>>>>> 
>>>> 
>>>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.
>>>> scala:67)
>>>>>>>>      at
>>>>>>>> 
>>>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> 

Reply via email to