rondagostino commented on a change in pull request #11503: URL: https://github.com/apache/kafka/pull/11503#discussion_r764369935
########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -2018,12 +2025,103 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) - require(!processRoles.contains(ControllerRole) || controllerListeners.nonEmpty, - s"${KafkaConfig.ControllerListenerNamesProp} cannot be empty if the server has the controller role") + val advertisedListenerNames = effectiveAdvertisedListeners.map(_.listenerName).toSet + + // validate KRaft-related configs + val voterAddressSpecsByNodeId = RaftConfig.parseVoterConnections(quorumVoters) + def validateNonEmptyQuorumVotersForKRaft(): Unit = { + if (voterAddressSpecsByNodeId.isEmpty) { + throw new ConfigException(s"If using ${KafkaConfig.ProcessRolesProp}, ${KafkaConfig.QuorumVotersProp} must contain a parseable set of voters.") + } + } + def validateControlPlaneListenerEmptyForKRaft(): Unit = { + require(controlPlaneListenerName.isEmpty, + s"${KafkaConfig.ControlPlaneListenerNameProp} is not supported in KRaft mode. KRaft uses ${KafkaConfig.ControllerListenerNamesProp} instead.") + } + val sourceOfAdvertisedListeners: String = + if (getString(KafkaConfig.AdvertisedListenersProp) != null) + s"${KafkaConfig.AdvertisedListenersProp}" + else + s"${KafkaConfig.ListenersProp}" + def validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker(): Unit = { + require(!advertisedListenerNames.exists(aln => controllerListenerNames.contains(aln.value())), + s"$sourceOfAdvertisedListeners must not contain KRaft controller listeners from ${KafkaConfig.ControllerListenerNamesProp} when ${KafkaConfig.ProcessRolesProp} contains the broker role because Kafka clients that send requests via advertised listeners do not send requests to KRaft controllers -- they only send requests to KRaft brokers.") + } + def validateControllerQuorumVotersMustContainNodeIDForKRaftController(): Unit = { + require(voterAddressSpecsByNodeId.containsKey(nodeId), + s"If ${KafkaConfig.ProcessRolesProp} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}") + } + def validateControllerListenerExistsForKRaftController(): Unit = { + require(controllerListeners.nonEmpty, + s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role") + } + def validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit = { + val listenerNameValues = listeners.map(_.listenerName.value).toSet + require(controllerListenerNames.forall(cln => listenerNameValues.contains(cln)), + s"${KafkaConfig.ControllerListenerNamesProp} must only contain values appearing in the '${KafkaConfig.ListenersProp}' configuration when running the KRaft controller role") + } + def validateAdvertisedListenersNonEmptyForBroker(): Unit = { + require(advertisedListenerNames.nonEmpty, + "There must be at least one advertised listener." + ( + if (processRoles.contains(BrokerRole)) s" Perhaps all listeners appear in ${ControllerListenerNamesProp}?" else "")) + } + if (processRoles == Set(BrokerRole)) { + // KRaft broker-only + validateNonEmptyQuorumVotersForKRaft() + validateControlPlaneListenerEmptyForKRaft() + validateAdvertisedListenersDoesNotContainControllerListenersForKRaftBroker() + // nodeId must not appear in controller.quorum.voters + require(!voterAddressSpecsByNodeId.containsKey(nodeId), + s"If ${KafkaConfig.ProcessRolesProp} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${KafkaConfig.QuorumVotersProp}=${voterAddressSpecsByNodeId.asScala.keySet.toSet}") + // controller.listener.names must be non-empty... + require(controllerListenerNames.exists(_.nonEmpty), + s"${KafkaConfig.ControllerListenerNamesProp} must contain at least one value when running KRaft with just the broker role") + // controller.listener.names are forbidden in listeners... + require(controllerListeners.isEmpty, + s"${KafkaConfig.ControllerListenerNamesProp} must not contain a value appearing in the '${KafkaConfig.ListenersProp}' configuration when running KRaft with just the broker role") + // controller.listener.names must all appear in listener.security.protocol.map + controllerListenerNames.filter(_.nonEmpty).foreach { name => + val listenerName = ListenerName.normalised(name) + if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) { Review comment: I looked around and did find a this loop in the config validation: ``` controllerListenerNames.foreach { name => val listenerName = ListenerName.normalised(name) if (!effectiveListenerSecurityProtocolMap.contains(listenerName)) { throw new ConfigException(s"Controller listener with name ${listenerName.value} defined in " + s"${KafkaConfig.ControllerListenerNamesProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}.") } } ``` As you pointed out, it probably doesn't matter given the small sizes. However, I can't think of a reason to not make it a `val` and only calculate it once, so I made that change. Let me know if I'm missing something and this shouldn't be done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org