This makes sense. Please file a JIRA where we can discuss a patch.

Thanks,
Neha


On Tue, Sep 24, 2013 at 9:00 AM, Jason Rosenberg <j...@squareup.com> wrote:

> I'm wondering if a simple change could be to not log full stack traces for
> simple things like "Connection refused", etc.  Seems it would be fine to
> just log the exception message in such cases.
>
> Also, the log levels could be tuned, such that things logged as ERROR
> indicate that all possible retries have been attempted, rather than having
> it be an ERROR for each step of the retry/failover process.  Thus, for a
> redundant, clustered service, it should be considered normal that single
> nodes will be unavailable (such as when we're doing a rolling restart of
> the cluster, etc.).  It should only be an ERROR if all brokers/all replicas
> are unavailable, etc.  This way, we can selectively set our log level to
> ERROR, and have it be useful.
>
> Does this make sense?  If so, I can file a Jira along these lines....
>
> Jason
>
>
> On Mon, Sep 23, 2013 at 9:51 PM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Agree that this is annoying. We plan to fix this largely as part of the
> > Client
> > Rewrite <
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > >project.
> > If you have ideas on fixing it before that, could you file a JIRA
> > where we can start a discussion?
> >
> > Thanks,
> > Neha
> >
> >
> > On Mon, Sep 23, 2013 at 3:26 PM, Jason Rosenberg <j...@squareup.com>
> wrote:
> >
> > > Sorry for the crazy long log trace here (feel free to ignore this
> message
> > > :))
> > >
> > > I'm just wondering if there's an easy way to sensibly reduce the amount
> > of
> > > logging that a kafka produer (0.8) will emit if I try to send a message
> > > (with ack level 1), if no broker is currently running?
> > >
> > > This is from one of my unit tests.  I am using the default retry count
> > (3)
> > > here, but even if I reduced that, it seems this is a crazy amount of
> > > logging.  (I've edited this to remove from each stack trace the portion
> > of
> > > testing/calling code, into the Producer.send() call).
> > >
> > > Here's the code snippet that produced the logging below (and note, the
> > > server was not available on the requested port).
> > >
> > >       KeyedMessage<Integer, T> msg = new KeyedMessage<Integer,
> T>(topic,
> > > message);
> > >       producer.send(msg);
> > >
> > > Jason
> > >
> > >
> > > 599 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 615 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > > correlation id 0 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 628 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for
> topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > kafka.common.KafkaException: fetching topic metadata for topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > Caused by: java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > ... 32 more
> > > 642 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 653 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > > correlation id 1 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 661 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > > collate messages by topic, partition due to: fetching topic metadata
> for
> > > topics [Set(test-topic)] from broker
> > > [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> > > 765 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 770 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > > correlation id 2 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 775 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for
> topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > kafka.common.KafkaException: fetching topic metadata for topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > Caused by: java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > ... 32 more
> > > 780 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 785 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > > correlation id 3 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 790 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > > collate messages by topic, partition due to: fetching topic metadata
> for
> > > topics [Set(test-topic)] from broker
> > > [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> > > 892 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 896 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > > correlation id 4 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 900 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for
> topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > kafka.common.KafkaException: fetching topic metadata for topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > Caused by: java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > ... 32 more
> > > 906 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 913 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> with
> > > correlation id 5 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 919 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > > collate messages by topic, partition due to: fetching topic metadata
> for
> > > topics [Set(test-topic)] from broker
> > > [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> > > 1021 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 1027 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> > with
> > > correlation id 6 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 1032 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for
> > topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > kafka.common.KafkaException: fetching topic metadata for topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > Caused by: java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > ... 32 more
> > > 1037 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 1041 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> > with
> > > correlation id 7 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 1047 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > > collate messages by topic, partition due to: fetching topic metadata
> for
> > > topics [Set(test-topic)] from broker
> > > [ArrayBuffer(id:0,host:localhost,port:1025)] failed
> > > 1148 [main] ERROR kafka.producer.SyncProducer  - Producer connection to
> > > localhost:1025 unsuccessful
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > 1152 [main] WARN kafka.client.ClientUtils$  - Fetching topic metadata
> > with
> > > correlation id 8 for topics [Set(test-topic)] from broker
> > > [id:0,host:localhost,port:1025] failed
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > >   ...
> > > 1157 [main] ERROR kafka.utils.Utils$  - fetching topic metadata for
> > topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > kafka.common.KafkaException: fetching topic metadata for topics
> > > [Set(test-topic)] from broker
> > [ArrayBuffer(id:0,host:localhost,port:1025)]
> > > failed
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > > at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > > Caused by: java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> > > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> > > at
> > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> > > at
> > >
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > > ... 32 more
> > > 1163 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to
> > > send requests for topics test-topic with correlation ids in [0,8]
> > > 1163 [main] WARN com.squareup.kafka.ng.producer.KafkaProducer  - Error
> > > sending data to kafka
> > > kafka.common.FailedToSendMessageException: Failed to send messages
> after
> > 3
> > > tries.
> > > at
> > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > at kafka.producer.Producer.send(Producer.scala:74)
> > > at kafka.javaapi.producer.Producer.send(Producer.scala:32)
> > > ...
> > >
> >
>

Reply via email to