This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ec7ba32 KAFKA-6394; Add a check to prevent misconfiguration of advertised listeners (#4897) ec7ba32 is described below commit ec7ba32af6542c6dbbf264a79804d25a98707971 Author: Manikumar Reddy O <manikumar.re...@gmail.com> AuthorDate: Fri May 11 21:19:49 2018 +0530 KAFKA-6394; Add a check to prevent misconfiguration of advertised listeners (#4897) Do not allow server startup if one of its configured advertised listeners has already been registered by another broker. --- core/src/main/scala/kafka/server/KafkaConfig.scala | 1 - core/src/main/scala/kafka/server/KafkaServer.scala | 8 +++- .../server/DynamicBrokerReconfigurationTest.scala | 2 +- .../scala/unit/kafka/server/KafkaServerTest.scala | 49 ++++++++++++++++++++++ 4 files changed, 57 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e296e26..4069b8e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -37,7 +37,6 @@ import org.apache.kafka.common.metrics.Sensor import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.server.quota.ClientQuotaCallback import scala.collection.JavaConverters._ import scala.collection.Map diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index c729c8c..ebbc0b8 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -43,7 +43,6 @@ import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} import org.apache.kafka.common.network._ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{ControlledShutdownRequest, ControlledShutdownResponse} -import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internal.ScramMechanism import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache import org.apache.kafka.common.security.{JaasContext, JaasUtils} @@ -378,6 +377,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP } private[server] def createBrokerInfo: BrokerInfo = { + val endPoints = config.advertisedListeners.map(e => s"${e.host}:${e.port}") + zkClient.getAllBrokersInCluster.filter(_.id != config.brokerId).foreach { broker => + val commonEndPoints = broker.endPoints.map(e => s"${e.host}:${e.port}").intersect(endPoints) + require(commonEndPoints.isEmpty, s"Configured end points ${commonEndPoints.mkString(",")} in" + + s" advertised listeners are already registered by broker ${broker.id}") + } + val listeners = config.advertisedListeners.map { endpoint => if (endpoint.port == 0) endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 8b70875..fb96f9d 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -701,7 +701,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet // Ensure connections are made to brokers before external listener is made inaccessible describeConfig(externalAdminClient) - // Update broker keystore for external listener to use invalid listener address + // Update broker external listener to use invalid listener address // any address other than localhost is sufficient to fail (either connection or host name verification failure) val invalidHost = "192.168.0.1" alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost) diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala new file mode 100755 index 0000000..d78821a --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import org.junit.Test + +class KafkaServerTest extends ZooKeeperTestHarness { + + @Test + def testAlreadyRegisteredAdvertisedListeners() { + //start a server with a advertised listener + val server1 = createServer(1, "myhost", TestUtils.RandomPort) + + //start a server with same advertised listener + intercept[IllegalArgumentException] { + createServer(2, "myhost", TestUtils.boundPort(server1)) + } + + //start a server with same host but with different port + val server2 = createServer(2, "myhost", TestUtils.RandomPort) + + TestUtils.shutdownServers(Seq(server1, server2)) + } + + def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = { + val props = TestUtils.createBrokerConfig(nodeId, zkConnect) + props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$hostName:$port") + val kafkaConfig = KafkaConfig.fromProps(props) + TestUtils.createServer(kafkaConfig) + } + +} -- To stop receiving notification emails like this one, please contact j...@apache.org.