filed: https://issues.apache.org/jira/browse/KAFKA-1066
On Tue, Sep 24, 2013 at 12:04 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > This makes sense. Please file a JIRA where we can discuss a patch. > > Thanks, > Neha > > > On Tue, Sep 24, 2013 at 9:00 AM, Jason Rosenberg <j...@squareup.com> wrote: > > > I'm wondering if a simple change could be to not log full stack traces > for > > simple things like "Connection refused", etc. Seems it would be fine to > > just log the exception message in such cases. > > > > Also, the log levels could be tuned, such that things logged as ERROR > > indicate that all possible retries have been attempted, rather than > having > > it be an ERROR for each step of the retry/failover process. Thus, for a > > redundant, clustered service, it should be considered normal that single > > nodes will be unavailable (such as when we're doing a rolling restart of > > the cluster, etc.). It should only be an ERROR if all brokers/all > replicas > > are unavailable, etc. This way, we can selectively set our log level to > > ERROR, and have it be useful. > > > > Does this make sense? If so, I can file a Jira along these lines.... > > > > Jason > > > > > > On Mon, Sep 23, 2013 at 9:51 PM, Neha Narkhede <neha.narkh...@gmail.com > > >wrote: > > > > > Agree that this is annoying. We plan to fix this largely as part of the > > > Client > > > Rewrite < > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > > > >project. > > > If you have ideas on fixing it before that, could you file a JIRA > > > where we can start a discussion? > > > > > > Thanks, > > > Neha > > > > > > > > > On Mon, Sep 23, 2013 at 3:26 PM, Jason Rosenberg <j...@squareup.com> > > wrote: > > > > > > > Sorry for the crazy long log trace here (feel free to ignore this > > message > > > > :)) > > > > > > > > I'm just wondering if there's an easy way to sensibly reduce the > amount > > > of > > > > logging that a kafka produer (0.8) will emit if I try to send a > message > > > > (with ack level 1), if no broker is currently running? > > > > > > > > This is from one of my unit tests. I am using the default retry > count > > > (3) > > > > here, but even if I reduced that, it seems this is a crazy amount of > > > > logging. (I've edited this to remove from each stack trace the > portion > > > of > > > > testing/calling code, into the Producer.send() call). > > > > > > > > Here's the code snippet that produced the logging below (and note, > the > > > > server was not available on the requested port). > > > > > > > > KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, > > T>(topic, > > > > message); > > > > producer.send(msg); > > > > > > > > Jason > > > > > > > > > > > > 599 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 615 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > with > > > > correlation id 0 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 628 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for > > topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > kafka.common.KafkaException: fetching topic metadata for topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > Caused by: java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > ... 32 more > > > > 642 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 653 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > with > > > > correlation id 1 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 661 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed > to > > > > collate messages by topic, partition due to: fetching topic metadata > > for > > > > topics [Set(test-topic)] from broker > > > > [ArrayBuffer(id:0,host:localhost,port:1025)] failed > > > > 765 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 770 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > with > > > > correlation id 2 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 775 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for > > topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > kafka.common.KafkaException: fetching topic metadata for topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > Caused by: java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > ... 32 more > > > > 780 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 785 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > with > > > > correlation id 3 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 790 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed > to > > > > collate messages by topic, partition due to: fetching topic metadata > > for > > > > topics [Set(test-topic)] from broker > > > > [ArrayBuffer(id:0,host:localhost,port:1025)] failed > > > > 892 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 896 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > with > > > > correlation id 4 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 900 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for > > topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > kafka.common.KafkaException: fetching topic metadata for topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > Caused by: java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > ... 32 more > > > > 906 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 913 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > with > > > > correlation id 5 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 919 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed > to > > > > collate messages by topic, partition due to: fetching topic metadata > > for > > > > topics [Set(test-topic)] from broker > > > > [ArrayBuffer(id:0,host:localhost,port:1025)] failed > > > > 1021 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 1027 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > > with > > > > correlation id 6 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 1032 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for > > > topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > kafka.common.KafkaException: fetching topic metadata for topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > Caused by: java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > ... 32 more > > > > 1037 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 1041 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > > with > > > > correlation id 7 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 1047 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed > to > > > > collate messages by topic, partition due to: fetching topic metadata > > for > > > > topics [Set(test-topic)] from broker > > > > [ArrayBuffer(id:0,host:localhost,port:1025)] failed > > > > 1148 [main] ERROR kafka.producer.SyncProducer - Producer connection > to > > > > localhost:1025 unsuccessful > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 1152 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata > > > with > > > > correlation id 8 for topics [Set(test-topic)] from broker > > > > [id:0,host:localhost,port:1025] failed > > > > java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > 1157 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for > > > topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > kafka.common.KafkaException: fetching topic metadata for topics > > > > [Set(test-topic)] from broker > > > [ArrayBuffer(id:0,host:localhost,port:1025)] > > > > failed > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > > > > at > > > > > > > > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > Caused by: java.net.ConnectException: Connection refused > > > > at sun.nio.ch.Net.connect0(Native Method) > > > > at sun.nio.ch.Net.connect(Net.java:465) > > > > at sun.nio.ch.Net.connect(Net.java:457) > > > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > > > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > > > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > > > > at > > > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > > > > at > > > > > > > > > > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > > > ... 32 more > > > > 1163 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed > to > > > > send requests for topics test-topic with correlation ids in [0,8] > > > > 1163 [main] WARN com.squareup.kafka.ng.producer.KafkaProducer - > Error > > > > sending data to kafka > > > > kafka.common.FailedToSendMessageException: Failed to send messages > > after > > > 3 > > > > tries. > > > > at > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > > > > ... > > > > > > > > > >