[ https://issues.apache.org/jira/browse/KAFKA-1066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manikumar resolved KAFKA-1066. ------------------------------ Resolution: Auto Closed Closing inactive issue. The old producer is no longer supported > Reduce logging on producer connection failures > ---------------------------------------------- > > Key: KAFKA-1066 > URL: https://issues.apache.org/jira/browse/KAFKA-1066 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.8.0 > Reporter: Jason Rosenberg > Assignee: Jun Rao > > Below is a stack trace from a unit test, where a producer tries to send a > message, but no server is available. > The exception/message logging seems to be inordinately verbose. > I'm thinking a simple change could be to not log full stack traces for simple > things like "Connection refused", etc. Seems it would be fine to just log > the exception message in such cases. > Also, the log levels could be tuned, such that things logged as ERROR > indicate that all possible retries have been attempted, rather than having it > be an ERROR for each step of the retry/failover process. Thus, for a > redundant, clustered service, it should be considered normal that single > nodes will be unavailable (such as when we're doing a rolling restart of the > cluster, etc.). It should only be an ERROR if all brokers/all replicas are > unavailable, etc. This way, we can selectively set our log level to ERROR, > and have it be useful. > This is from one of my unit tests. I am using the default retry count (3) > here, but even if I reduced that, it seems this is a crazy amount of logging. > (I've edited this to remove from each stack trace the portion of > testing/calling code, into the Producer.send() call). > Here's the code snippet that produced the logging below (and note, the server > was not available on the requested port). > KeyedMessage<Integer, T> msg = new KeyedMessage<Integer, T>(topic, > message); > producer.send(msg); > Jason > 599 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 615 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 0 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 628 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:69) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:69) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > Caused by: java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > ... 32 more > 642 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 653 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 1 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 661 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic, partition due to: fetching topic metadata for > topics [Set(test-topic)] from broker > [ArrayBuffer(id:0,host:localhost,port:1025)] failed > 765 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 770 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 2 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 775 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > Caused by: java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > ... 32 more > 780 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 785 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 3 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 790 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic, partition due to: fetching topic metadata for > topics [Set(test-topic)] from broker > [ArrayBuffer(id:0,host:localhost,port:1025)] failed > 892 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 896 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 4 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 900 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > Caused by: java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > ... 32 more > 906 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 913 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 5 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 919 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic, partition due to: fetching topic metadata for > topics [Set(test-topic)] from broker > [ArrayBuffer(id:0,host:localhost,port:1025)] failed > 1021 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 1027 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 6 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 1032 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > Caused by: java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > ... 32 more > 1037 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 1041 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 7 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 1047 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic, partition due to: fetching topic metadata for > topics [Set(test-topic)] from broker > [ArrayBuffer(id:0,host:localhost,port:1025)] failed > 1148 [main] ERROR kafka.producer.SyncProducer - Producer connection to > localhost:1025 unsuccessful > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 1152 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with > correlation id 8 for topics [Set(test-topic)] from broker > [id:0,host:localhost,port:1025] failed > java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > 1157 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > kafka.common.KafkaException: fetching topic metadata for topics > [Set(test-topic)] from broker [ArrayBuffer(id:0,host:localhost,port:1025)] > failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... > Caused by: java.net.ConnectException: Connection refused > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > ... 32 more > 1163 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send > requests for topics test-topic with correlation ids in [0,8] > 1163 [main] WARN com.squareup.kafka.ng.producer.KafkaProducer - Error > sending data to kafka > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > ... -- This message was sent by Atlassian JIRA (v6.4.14#64029)