I'm wondering if a simple change could be to not log full stack traces for
simple things like "Connection refused", etc.  Seems it would be fine to
just log the exception message in such cases.

Also, the log levels could be tuned, such that things logged as ERROR
indicate that all possible retries have been attempted, rather than having
it be an ERROR for each step of the retry/failover process.  Thus, for a
redundant, clustered service, it should be considered normal that single
nodes will be unavailable (such as when we're doing a rolling restart of
the cluster, etc.).  It should only be an ERROR if all brokers/all replicas
are unavailable, etc.  This way, we can selectively set our log level to
ERROR, and have it be useful.

Does this make sense?  If so, I can file a Jira along these lines....

Jason


On Mon, Sep 23, 2013 at 9:51 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Agree that this is annoying. We plan to fix this largely as part of the
> Client
> Rewrite <https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> >project.
> If you have ideas on fixing it before that, could you file a JIRA
> where we can start a discussion?
>
> Thanks,
> Neha
>
>
> On Mon, Sep 23, 2013 at 3:26 PM, Jason Rosenberg <j...@squareup.com> wrote:
>
> > Sorry for the crazy long log trace here (feel free to ignore this message
> > :))
> >
> > I'm just wondering if there's an easy way to sensibly reduce the amount
> of
> > logging that a kafka produer (0.8) will emit if I try to send a message
> > (with ack level 1), if no broker is currently running?
> >
> > This is from one of my unit tests.  I am using the default retry count
> (3)
> > here, but even if I reduced that, it seems this is a crazy amount of
> > logging.  (I've edited this to remove from each stack trace the portion
> of
> > testing/calling code, into the Producer.send() call).
> >
> > Here's the code snippet that produced the logging below (and note, the
> > server was not available on the requested port).
> >
> >       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic,
> > message);
> >       producer.send(msg);
> >
> > Jason
> >
> >
> > 599 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 615 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with
> > correlation id 0 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 628 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > Caused by: java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > ... 32 more
> > 642 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 653 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with
> > correlation id 1 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 661 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > collate messages by topic, partition due to: fetching topic metadata for
> > topics [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> > 765 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 770 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with
> > correlation id 2 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 775 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > Caused by: java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > ... 32 more
> > 780 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 785 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with
> > correlation id 3 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 790 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > collate messages by topic, partition due to: fetching topic metadata for
> > topics [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> > 892 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 896 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with
> > correlation id 4 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 900 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > Caused by: java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > ... 32 more
> > 906 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 913 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata with
> > correlation id 5 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 919 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > collate messages by topic, partition due to: fetching topic metadata for
> > topics [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> > 1021 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 1027 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > correlation id 6 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 1032 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for
> topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > Caused by: java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > ... 32 more
> > 1037 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 1041 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > correlation id 7 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > at
> >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 1047 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > collate messages by topic, partition due to: fetching topic metadata for
> > topics [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> > 1148 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > localhost:1025 unsuccessful
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > 1152 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > correlation id 8 for topics [Set(test-topic)] from broker
> > [id:0,host:localhost,port:1025] failed
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> >   ...
> > 1157 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for
> topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(test-topic)] from broker
> [ArrayBuffer(id:0,host:localhost,port:1025)]
> > failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> > Caused by: java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > ... 32 more
> > 1163 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > send requests for topics test-topic with correlation ids in [0,8]
> > 1163 [main] WARN com.squareup.kafka.ng.producer.KafkaProducer  - Error
> > sending data to kafka
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > at kafka.producer.Producer.send(Producer.scala:74)
> > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > ...
> >
>

Reply via email to