[
https://issues.apache.org/jira/browse/KAFKA-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13829077#comment-13829077
]
Jun Rao commented on KAFKA-1138:
--------------------------------
Do you think https://issues.apache.org/jira/browse/KAFKA-1092 addresses this
issue?
> Remote producer uses the hostname defined in broker
> ---------------------------------------------------
>
> Key: KAFKA-1138
> URL: https://issues.apache.org/jira/browse/KAFKA-1138
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 0.8
> Reporter: Hyun-Gul Roh
> Assignee: Jun Rao
>
> When the producer API in the node which is not the broker sends message to a
> broker, only TopicMetadataRequest is sent, but ProducerRequest is not by
> observing the log of "kafka-request.log"
> According to my analysis, when the producer api sends ProducerRequest, it
> seems to use the hostname defined in the broker. So, if the hostname is not
> the one registered in DNS, the producer cannot send the ProducerRequest.
> I am attaching the log:
> [2013-11-21 15:28:49,464] ERROR Failed to collate messages by topic,
> partition due to: fetching topic metadata for topics [Set(test)] from broker
> [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
> (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:49,465] INFO Back off for 100 ms before retrying send.
> Remaining retries = 1 (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:49,566] INFO Fetching metadata from broker
> id:0,host:111.111.111.111,port:9092 with correlation id 6 for 1 topic(s)
> Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:49,819] ERROR Producer connection to 111.111.111.111:9092
> unsuccessful (kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:49,821] WARN Fetching topic metadata with correlation id 6
> for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092]
> failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:49,822] ERROR fetching topic metadata for topics
> [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)]
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test)]
> from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> Caused by: java.net.ConnectException: 연결이 거부됨
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> ... 12 more
> [2013-11-21 15:28:49,825] INFO Fetching metadata from broker
> id:0,host:111.111.111.111,port:9092 with correlation id 7 for 1 topic(s)
> Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:50,021] ERROR Producer connection to 111.111.111.111:9092
> unsuccessful (kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,024] WARN Fetching topic metadata with correlation id 7
> for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092]
> failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
> at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
> at
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,025] ERROR Failed to collate messages by topic,
> partition due to: fetching topic metadata for topics [Set(test)] from broker
> [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
> (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,026] INFO Back off for 100 ms before retrying send.
> Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,127] INFO Fetching metadata from broker
> id:0,host:111.111.111.111,port:9092 with correlation id 8 for 1 topic(s)
> Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:50,324] ERROR Producer connection to 111.111.111.111:9092
> unsuccessful (kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,326] WARN Fetching topic metadata with correlation id 8
> for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092]
> failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,328] ERROR fetching topic metadata for topics
> [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)]
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test)]
> from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> Caused by: java.net.ConnectException: 연결이 거부됨
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
> at
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
> at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> ... 12 more
> [2013-11-21 15:28:50,332] ERROR Failed to send requests for topics test with
> correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,333] ERROR Error in handling batch of 1 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>
--
This message was sent by Atlassian JIRA
(v6.1#6144)