[ 
https://issues.apache.org/jira/browse/KAFKA-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472150#comment-16472150
 ] 

ASF GitHub Bot commented on KAFKA-6394:
---------------------------------------

hachikuji closed pull request #4897: KAFKA-6394: Add a check to prevent 
misconfiguration of advertised listeners
URL: https://github.com/apache/kafka/pull/4897
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 4834791f995..d8a36f77032 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.quota.ClientQuotaCallback
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index c729c8c90f7..ebbc0b8d205 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, 
_}
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{ControlledShutdownRequest, 
ControlledShutdownResponse}
-import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internal.ScramMechanism
 import 
org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
@@ -378,6 +377,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   }
 
   private[server] def createBrokerInfo: BrokerInfo = {
+    val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}")
+    zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { 
broker =>
+      val commonEndPoints = broker.endPoints.map(e => 
s"${e.host}:${e.port}").intersect(endPoints)
+      require(commonEndPoints.isEmpty, s"Configured end points 
${commonEndPoints.mkString(",")} in" +
+        s" advertised listeners are already registered by broker ${broker.id}")
+    }
+
     val listeners = config.advertisedListeners.map { endpoint =>
       if (endpoint.port == 0)
         endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 79dec265f90..1e1a0a6a2a6 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -596,7 +596,7 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     // Ensure connections are made to brokers before external listener is made 
inaccessible
     describeConfig(externalAdminClient)
 
-    // Update broker keystore for external listener to use invalid listener 
address
+    // Update broker external listener to use invalid listener address
     // any address other than localhost is sufficient to fail (either 
connection or host name verification failure)
     val invalidHost = "192.168.0.1"
     alterAdvertisedListener(adminClient, externalAdminClient, "localhost", 
invalidHost)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
new file mode 100755
index 00000000000..d78821a2ca5
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.Test
+
+class KafkaServerTest extends ZooKeeperTestHarness {
+
+  @Test
+  def testAlreadyRegisteredAdvertisedListeners() {
+    //start a server with a advertised listener
+    val server1 = createServer(1, "myhost", TestUtils.RandomPort)
+
+    //start a server with same advertised listener
+    intercept[IllegalArgumentException] {
+      createServer(2, "myhost", TestUtils.boundPort(server1))
+    }
+
+    //start a server with same host but with different port
+    val server2 = createServer(2, "myhost", TestUtils.RandomPort)
+
+    TestUtils.shutdownServers(Seq(server1, server2))
+  }
+
+  def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
+    val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
+    props.put(KafkaConfig.AdvertisedListenersProp, 
s"PLAINTEXT://$hostName:$port")
+    val kafkaConfig = KafkaConfig.fromProps(props)
+    TestUtils.createServer(kafkaConfig)
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Prevent misconfiguration of advertised listeners
> ------------------------------------------------
>
>                 Key: KAFKA-6394
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6394
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Manikumar
>            Priority: Major
>
> We don't really have any protection from misconfiguration of the advertised 
> listeners. Sometimes users will copy the config from one host to another 
> during an upgrade. They may remember to update the broker id, but forget 
> about the advertised listeners. It can be surprisingly difficult to detect 
> this unless you know to look for it (e.g. you might just see a lot of 
> NotLeaderForPartition errors as the fetchers connect to the wrong broker). It 
> may not be totally foolproof, but it's probably enough for the common 
> misconfiguration case to check existing brokers to see whether there are any 
> which have already registered the advertised listener.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to