Hyun-Gul Roh created KAFKA-1138:
-----------------------------------

             Summary: Remote producer uses the hostname defined in broker
                 Key: KAFKA-1138
                 URL: https://issues.apache.org/jira/browse/KAFKA-1138
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 0.8
            Reporter: Hyun-Gul Roh
            Assignee: Jun Rao


When the producer API in the node which is not the broker sends message to a 
broker, only TopicMetadataRequest is sent, but ProducerRequest is not by 
observing the log of "kafka-request.log"
According to my analysis, when the producer api sends ProducerRequest, it seems 
to use the hostname defined in the broker. So, if the hostname is not the one 
registered in DNS, the producer cannot send the ProducerRequest. 


I am attaching the log:

[2013-11-21 15:28:49,464] ERROR Failed to collate messages by topic, partition 
due to: fetching topic metadata for topics [Set(test)] from broker 
[ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed 
(kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:49,465] INFO Back off for 100 ms before retrying send. 
Remaining retries = 1 (kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:49,566] INFO Fetching metadata from broker 
id:0,host:111.111.111.111,port:9092 with correlation id 6 for 1 topic(s) 
Set(test) (kafka.client.ClientUtils$)
[2013-11-21 15:28:49,819] ERROR Producer connection to 111.111.111.111:9092 
unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: 연결이 거부됨
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
        at kafka.utils.Utils$.swallow(Utils.scala:186)
        at kafka.utils.Logging$class.swallowError(Logging.scala:105)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:49,821] WARN Fetching topic metadata with correlation id 6 
for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed 
(kafka.client.ClientUtils$)
java.net.ConnectException: 연결이 거부됨
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
        at kafka.utils.Utils$.swallow(Utils.scala:186)
        at kafka.utils.Logging$class.swallowError(Logging.scala:105)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:49,822] ERROR fetching topic metadata for topics [Set(test)] 
from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed 
(kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] 
from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
        at kafka.utils.Utils$.swallow(Utils.scala:186)
        at kafka.utils.Logging$class.swallowError(Logging.scala:105)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: 연결이 거부됨
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        ... 12 more
[2013-11-21 15:28:49,825] INFO Fetching metadata from broker 
id:0,host:111.111.111.111,port:9092 with correlation id 7 for 1 topic(s) 
Set(test) (kafka.client.ClientUtils$)
[2013-11-21 15:28:50,021] ERROR Producer connection to 111.111.111.111:9092 
unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: 연결이 거부됨
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at 
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:50,024] WARN Fetching topic metadata with correlation id 7 
for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed 
(kafka.client.ClientUtils$)
java.net.ConnectException: 연결이 거부됨
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
        at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
        at 
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
        at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:50,025] ERROR Failed to collate messages by topic, partition 
due to: fetching topic metadata for topics [Set(test)] from broker 
[ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed 
(kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:50,026] INFO Back off for 100 ms before retrying send. 
Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:50,127] INFO Fetching metadata from broker 
id:0,host:111.111.111.111,port:9092 with correlation id 8 for 1 topic(s) 
Set(test) (kafka.client.ClientUtils$)
[2013-11-21 15:28:50,324] ERROR Producer connection to 111.111.111.111:9092 
unsuccessful (kafka.producer.SyncProducer)
java.net.ConnectException: 연결이 거부됨
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
        at kafka.utils.Utils$.swallow(Utils.scala:186)
        at kafka.utils.Logging$class.swallowError(Logging.scala:105)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:50,326] WARN Fetching topic metadata with correlation id 8 
for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] failed 
(kafka.client.ClientUtils$)
java.net.ConnectException: 연결이 거부됨
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
        at kafka.utils.Utils$.swallow(Utils.scala:186)
        at kafka.utils.Logging$class.swallowError(Logging.scala:105)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-11-21 15:28:50,328] ERROR fetching topic metadata for topics [Set(test)] 
from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed 
(kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] 
from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
        at 
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at 
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
        at kafka.utils.Utils$.swallow(Utils.scala:186)
        at kafka.utils.Logging$class.swallowError(Logging.scala:105)
        at kafka.utils.Utils$.swallowError(Utils.scala:45)
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: 연결이 거부됨
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:465)
        at sun.nio.ch.Net.connect(Net.java:457)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
        at 
kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
        at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        ... 12 more
[2013-11-21 15:28:50,332] ERROR Failed to send requests for topics test with 
correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2013-11-21 15:28:50,333] ERROR Error in handling batch of 1 events 
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 
tries.
        at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:254)
        at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at 
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
       




--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to