[ 
https://issues.apache.org/jira/browse/KAFKA-1066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1066.
------------------------------
    Resolution: Auto Closed

Closing inactive issue. The old producer is no longer supported

> Reduce logging on producer connection failures
> ----------------------------------------------
>
>                 Key: KAFKA-1066
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1066
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.0
>            Reporter: Jason Rosenberg
>            Assignee: Jun Rao
>
> Below is a stack trace from a unit test, where a producer tries to send a 
> message, but no server is available.
> The exception/message logging seems to be inordinately verbose.  
> I'm thinking a simple change could be to not log full stack traces for simple 
> things like "Connection refused", etc.  Seems it would be fine to just log 
> the exception message in such cases.
> Also, the log levels could be tuned, such that things logged as ERROR 
> indicate that all possible retries have been attempted, rather than having it 
> be an ERROR for each step of the retry/failover process.  Thus, for a 
> redundant, clustered service, it should be considered normal that single 
> nodes will be unavailable (such as when we're doing a rolling restart of the 
> cluster, etc.).  It should only be an ERROR if all brokers/all replicas are 
> unavailable, etc.  This way, we can selectively set our log level to ERROR, 
> and have it be useful.
> This is from one of my unit tests.  I am using the default retry count (3) 
> here, but even if I reduced that, it seems this is a crazy amount of logging. 
>  (I've edited this to remove from each stack trace the portion of 
> testing/calling code, into the Producer.send() call).
> Here's the code snippet that produced the logging below (and note, the server 
> was not available on the requested port).
>       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic, 
> message);
>       producer.send(msg);
> Jason
> 599 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 615 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 0 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 628 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
> kafka.common.KafkaException: fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> Caused by: java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       ... 32 more
> 642 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 653 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 1 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 661 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to 
> collate messages by topic, partition due to: fetching topic metadata for 
> topics [Set(test-topic)] from broker 
> [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 765 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 770 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 2 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 775 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
> kafka.common.KafkaException: fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> Caused by: java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       ... 32 more
> 780 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 785 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 3 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 790 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to 
> collate messages by topic, partition due to: fetching topic metadata for 
> topics [Set(test-topic)] from broker 
> [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 892 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 896 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 4 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 900 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
> kafka.common.KafkaException: fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> Caused by: java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       ... 32 more
> 906 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 913 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 5 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 919 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to 
> collate messages by topic, partition due to: fetching topic metadata for 
> topics [Set(test-topic)] from broker 
> [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 1021 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 1027 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 6 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 1032 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
> kafka.common.KafkaException: fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> Caused by: java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       ... 32 more
> 1037 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 1041 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 7 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> 1047 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to 
> collate messages by topic, partition due to: fetching topic metadata for 
> topics [Set(test-topic)] from broker 
> [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> 1148 [main] ERROR kafka.producer.SyncProducer  - Producer connection to 
> localhost:1025 unsuccessful
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>         ...
> 1152 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with 
> correlation id 8 for topics [Set(test-topic)] from broker 
> [id:0,host:localhost,port:1025] failed
> java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>        ...
> 1157 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
> kafka.common.KafkaException: fetching topic metadata for topics 
> [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] 
> failed
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...
> Caused by: java.net.ConnectException: Connection refused
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       ... 32 more
> 1163 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send 
> requests for topics test-topic with correlation ids in [0,8]
> 1163 [main] WARN com.squareup.kafka.ng.producer.KafkaProducer  - Error 
> sending data to kafka
> kafka.common.FailedToSendMessageException: Failed to send messages after 3 
> tries.
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>       at kafka.producer.Producer.send(Producer.scala:74)
>       at kafka.javaapi.producer.Producer.send(Producer.scala:32)
>       ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to