Sure no problem. I actually did run the ‘localhost’ command to generate output for the e-mail, but that was a typo: I always use the right IP. But I ran it again, this time with the IP of the VM. I ran the command from my Mac (the client). Thanks for taking a look! Here it is:
Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ bin/kafka-console-producer.sh --broker-list 192.168.241.128:9092 --topic test [2015-02-18 09:45:49,148] WARN Property topic is not valid (kafka.utils.VerifiableProperties) and boy are my arms tired [2015-02-18 09:45:58,533] WARN Failed to send producer request with correlation id 2 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2015-02-18 09:45:58,649] WARN Failed to send producer request with correlation id 5 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2015-02-18 09:45:58,760] WARN Failed to send producer request with correlation id 8 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2015-02-18 09:45:58,868] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [test,0] (kafka.producer.async.DefaultEventHandler) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.producer.SyncProducer.send(SyncProducer.scala:101) at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) [2015-02-18 09:45:58,980] ERROR Failed to send requests for topics test with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler) [2015-02-18 09:45:58,981] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) at scala.collection.immutable.Stream.foreach(Stream.scala:547) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) > On Feb 18, 2015, at 9:40 AM, Jiangjie Qin <j...@linkedin.com.INVALID> wrote: > > I think your log did show that your are connecting to localhost:9092: > > [2015-02-17 20:43:32,622] WARN Fetching topic metadata with correlation id > 0 for topics [Set(test)] from broker [id:0,host:localhost,port:9092] > failed (kafka.client.ClientUtils$) > java.nio.channels.ClosedChannelException > > > Can you check again? > > -Jiangjie (Becket) Qin > > On 2/17/15, 10:19 PM, "Gwen Shapira" <gshap...@cloudera.com> wrote: > >> Time to debug Kafka then :) >> >> Does the topic you are producing to exists? (you can check with >> kafka-topics tool) >> If not, do you have auto-creation enabled? >> >> Which version are you on? Is it possible you ran into KAFKA-1738? >> >> >> On Tue, Feb 17, 2015 at 10:08 PM, Richard Spillane <r...@defend7.com> >> wrote: >> >>> Telnet seems to be able to connect from the Mac to the VM and from the >>> VM >>> to the VM: >>> >>> From Mac to VM: >>> Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet 192.168.241.128 >>> 9092 >>> Trying 192.168.241.128... >>> Connected to 192.168.241.128. >>> Escape character is '^]¹. >>> >>> From VM to VM: >>> rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet localhost 9092 >>> Trying ::1... >>> Connected to localhost. >>> Escape character is '^]¹. >>> >>> From VM to Mac: >>> rick@ubuntu:~/kafka_2.10-0.8.2.0$ telnet 192.168.1.27 9092 >>> Trying 192.168.1.27... >>> telnet: Unable to connect to remote host: Connection refused >>> >>> From Mac to Mac: >>> Richards-MacBook-Air:kafka_2.10-0.8.2.0 rick$ telnet localhost 9092 >>> Trying ::1... >>> telnet: connect to address ::1: Connection refused >>> Trying 127.0.0.1... >>> telnet: connect to address 127.0.0.1: Connection refused >>> telnet: Unable to connect to remote host >>> >>> >>>> On Feb 17, 2015, at 10:03 PM, Gwen Shapira <gshap...@cloudera.com> >>> wrote: >>>> >>>> What happens when you telnet to port 9092? try it from both your mac >>> and >>>> the ubuntu vm. >>>> >>>> >>>> On Tue, Feb 17, 2015 at 9:26 PM, Richard Spillane <r...@defend7.com> >>> wrote: >>>> >>>>> I checked iptables and all rules are set to forward, so nothing >>> should >>> be >>>>> blocked in the VM example. In the container example the port is >>> explicitly >>>>> EXPOSEd and other ports in a similar range (e.g., 8080) can be >>> accessed >>>>> just fine. >>>>> >>>>>> On Feb 17, 2015, at 8:56 PM, Gwen Shapira <gshap...@cloudera.com> >>> wrote: >>>>>> >>>>>> Is it possible that you have iptables on the Ubuntu where you run >>> your >>>>>> broker? >>>>>> >>>>>> Try disabling iptables and see if it fixes the issue. >>>>>> >>>>>> On Tue, Feb 17, 2015 at 8:47 PM, Richard Spillane <r...@defend7.com> >>>>> wrote: >>>>>> >>>>>>> So I would like to have two machines: one running zookeeper and a >>> single >>>>>>> kafka node and another machine running a producer. I want to use >>> the >>>>> basic >>>>>>> commands mentioned in the Quick Start guide to do this. However, I >>> keep >>>>>>> getting connection closed exceptions in the producer. >>>>>>> >>>>>>> This is what I do: >>>>>>> On the kafka/zookeeper machine: >>>>>>> bin/zookeeper-server-start.sh config/zookeeper.properties & >>>>>>> bin/kafka-server-start.sh config/server.properties & >>>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 >>>>>>> --replication-factor 1 --partitions 1 --topic test >>>>>>> >>>>>>> Šso far so good, now on the producer machine: >>>>>>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic >>> test >>>>>>> [2015-02-17 20:43:28,466] WARN Property topic is not valid >>>>>>> (kafka.utils.VerifiableProperties) >>>>>>> echo >>>>>>> <press enter to send Œecho¹ above> >>>>>>> >>>>>>> Šnow it starts spewing the errors in the Producer Errors Appendix. >>>>>>> >>>>>>> What I don¹t understand is why? I checked the default >>> configurations >>> and >>>>>>> it is binding to all interfaces as the bind to localhost is >>> commented >>>>> out. >>>>>>> I checked netstat and 9092 is open on the zookeeper/kafka machine. >>> I >>>>> have >>>>>>> tried this with an Ubuntu VM and a container where the container >>> hosts >>>>> the >>>>>>> zookeeper/kafka server and I have tried this with my native machine >>> (OS >>>>> X) >>>>>>> and an Ubuntu VM where the VM is the zookeeper/kafka server. In >>> both >>>>> cases >>>>>>> the same thing happens. >>>>>>> >>>>>>> I am just trying to get the simplest possible configuration where >>> the >>>>>>> producer is not on the same machine as the kafka queue up and >>> running. >>>>> How >>>>>>> can I make this work? Thanks for any help. >>>>>>> >>>>>>> Producer Erros Appendix: >>>>>>> >>>>>>> [2015-02-17 20:43:32,622] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 0 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(D >>> efaultEventHandler.scala:67) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :67) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:32,625] ERROR fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> (kafka.utils.Utils$) >>>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(D >>> efaultEventHandler.scala:67) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :67) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> ... 12 more >>>>>>> [2015-02-17 20:43:32,627] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 1 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition >>> Info.scala:49) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven >>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a >>> pply(DefaultEventHandler.scala:150) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a >>> pply(DefaultEventHandler.scala:149) >>>>>>> at >>>>>>> >>>>> >>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal >>> a:59) >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent >>> Handler.scala:149) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv >>> entHandler.scala:95) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:32,628] ERROR Failed to collate messages by >>> topic, >>>>>>> partition due to: fetching topic metadata for topics [Set(test)] >>> from >>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed >>>>>>> (kafka.producer.async.DefaultEventHandler) >>>>>>> [2015-02-17 20:43:32,734] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 2 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D >>> efaultEventHandler.scala:78) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :78) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:32,735] ERROR fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> (kafka.utils.Utils$) >>>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D >>> efaultEventHandler.scala:78) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :78) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> ... 12 more >>>>>>> [2015-02-17 20:43:32,737] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 3 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition >>> Info.scala:49) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven >>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a >>> pply(DefaultEventHandler.scala:150) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a >>> pply(DefaultEventHandler.scala:149) >>>>>>> at >>>>>>> >>>>> >>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal >>> a:59) >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent >>> Handler.scala:149) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv >>> entHandler.scala:95) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:32,738] ERROR Failed to collate messages by >>> topic, >>>>>>> partition due to: fetching topic metadata for topics [Set(test)] >>> from >>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed >>>>>>> (kafka.producer.async.DefaultEventHandler) >>>>>>> [2015-02-17 20:43:32,844] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 4 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D >>> efaultEventHandler.scala:78) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :78) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:32,844] ERROR fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> (kafka.utils.Utils$) >>>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D >>> efaultEventHandler.scala:78) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :78) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> ... 12 more >>>>>>> [2015-02-17 20:43:32,846] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 5 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition >>> Info.scala:49) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven >>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a >>> pply(DefaultEventHandler.scala:150) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a >>> pply(DefaultEventHandler.scala:149) >>>>>>> at >>>>>>> >>>>> >>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal >>> a:59) >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent >>> Handler.scala:149) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv >>> entHandler.scala:95) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:32,847] ERROR Failed to collate messages by >>> topic, >>>>>>> partition due to: fetching topic metadata for topics [Set(test)] >>> from >>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed >>>>>>> (kafka.producer.async.DefaultEventHandler) >>>>>>> [2015-02-17 20:43:32,953] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 6 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D >>> efaultEventHandler.scala:78) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :78) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:32,954] ERROR fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> (kafka.utils.Utils$) >>>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D >>> efaultEventHandler.scala:78) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :78) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> ... 12 more >>>>>>> [2015-02-17 20:43:32,955] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 7 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartition >>> Info.scala:49) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEven >>> tHandler$$getPartitionListForTopic(DefaultEventHandler.scala:186) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a >>> pply(DefaultEventHandler.scala:150) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.a >>> pply(DefaultEventHandler.scala:149) >>>>>>> at >>>>>>> >>>>> >>> >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scal >>> a:59) >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEvent >>> Handler.scala:149) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEv >>> entHandler.scala:95) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:32,957] ERROR Failed to collate messages by >>> topic, >>>>>>> partition due to: fetching topic metadata for topics [Set(test)] >>> from >>>>>>> broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed >>>>>>> (kafka.producer.async.DefaultEventHandler) >>>>>>> [2015-02-17 20:43:33,063] WARN Fetching topic metadata with >>> correlation >>>>> id >>>>>>> 8 for topics [Set(test)] from broker >>> [id:0,host:localhost,port:9092] >>>>> failed >>>>>>> (kafka.client.ClientUtils$) >>>>>>> java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D >>> efaultEventHandler.scala:78) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :78) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> [2015-02-17 20:43:33,064] ERROR fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> (kafka.utils.Utils$) >>>>>>> kafka.common.KafkaException: fetching topic metadata for topics >>>>>>> [Set(test)] from broker >>> [ArrayBuffer(id:0,host:localhost,port:9092)] >>>>> failed >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:8 >>> 2) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(D >>> efaultEventHandler.scala:78) >>>>>>> at kafka.utils.Utils$.swallow(Utils.scala:172) >>>>>>> at kafka.utils.Logging$class.swallowError(Logging.scala:106) >>>>>>> at kafka.utils.Utils$.swallowError(Utils.scala:45) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :78) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> Caused by: java.nio.channels.ClosedChannelException >>>>>>> at >>> kafka.network.BlockingChannel.send(BlockingChannel.scala:100) >>>>>>> at >>>>> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProdu >>> cer.scala:72) >>>>>>> at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >>>>>>> at >>>>>>> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58) >>>>>>> ... 12 more >>>>>>> [2015-02-17 20:43:33,066] ERROR Failed to send requests for topics >>> test >>>>>>> with correlation ids in [0,8] >>> (kafka.producer.async.DefaultEventHandler) >>>>>>> [2015-02-17 20:43:33,067] ERROR Error in handling batch of 1 events >>>>>>> (kafka.producer.async.ProducerSendThread) >>>>>>> kafka.common.FailedToSendMessageException: Failed to send messages >>>>> after 3 >>>>>>> tries. >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala >>> :90) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.sc >>> ala:105) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:88) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Pr >>> oducerSendThread.scala:68) >>>>>>> at >>> scala.collection.immutable.Stream.foreach(Stream.scala:547) >>>>>>> at >>>>>>> >>>>> >>> >>> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread. >>> scala:67) >>>>>>> at >>>>>>> >>> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) >>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>> >>> >